From 84d099ec804d60fef95626675802a24b1e65ee54 Mon Sep 17 00:00:00 2001 From: auxten Date: Sat, 2 Nov 2024 06:26:13 +0000 Subject: [PATCH 01/36] Fix result buf copy in query_stable_v2 --- programs/local/LocalServer.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/programs/local/LocalServer.cpp b/programs/local/LocalServer.cpp index 4239ef0aa59..e720ec24e48 100644 --- a/programs/local/LocalServer.cpp +++ b/programs/local/LocalServer.cpp @@ -1207,7 +1207,7 @@ local_result_v2 * query_stable_v2(int argc, char ** argv) else { // Handle successful data retrieval scenario - res->_vec = new std::vector(*result->buf_); + res->_vec = result->buf_; res->len = result->buf_->size(); res->buf = result->buf_->data(); res->rows_read = result->rows_; From 50f33a61fba3897a479674b43c8a71758d8f18a2 Mon Sep 17 00:00:00 2001 From: auxten Date: Sat, 2 Nov 2024 08:13:18 +0000 Subject: [PATCH 02/36] Impl chdb_conn connect_chdb close_conn query_conn --- programs/local/LocalChdb.cpp | 10 +++ programs/local/LocalServer.cpp | 155 ++++++++++++++++++++++++++++++++- programs/local/LocalServer.h | 7 +- programs/local/chdb.h | 10 +++ src/Client/ClientBase.cpp | 15 ++++ src/Client/ClientBase.h | 12 ++- 6 files changed, 198 insertions(+), 11 deletions(-) diff --git a/programs/local/LocalChdb.cpp b/programs/local/LocalChdb.cpp index 7ea577e91ce..c6e7bf72a82 100644 --- a/programs/local/LocalChdb.cpp +++ b/programs/local/LocalChdb.cpp @@ -191,6 +191,16 @@ PYBIND11_MODULE(_chdb, m) py::arg("path") = "", py::arg("udf_path") = "", "Query chDB and return a query_result object"); + + // Pybind wrapper for: + // struct chdb_conn + // { + // void * server; // LocalServer * server; + // bool connected; + // }; + // CHDB_EXPORT chdb_conn * connect_chdb(int argc, char ** argv); + // CHDB_EXPORT void close_conn(chdb_conn * conn); + // CHDB_EXPORT struct local_result_v2 * query_conn(chdb_conn * conn, const char * query, const char * format); } #endif // PY_TEST_MAIN diff --git a/programs/local/LocalServer.cpp b/programs/local/LocalServer.cpp index e720ec24e48..20181538ee9 100644 --- a/programs/local/LocalServer.cpp +++ b/programs/local/LocalServer.cpp @@ -493,12 +493,12 @@ try } } - is_interactive = stdin_is_a_tty + is_interactive = !is_background && stdin_is_a_tty && (getClientConfiguration().hasOption("interactive") || (queries.empty() && !getClientConfiguration().has("table-structure") && queries_files.empty() && !getClientConfiguration().has("table-file"))); - if (!is_interactive) + if (!is_interactive && !is_background) { /// We will terminate process on error static KillingErrorHandler error_handler; @@ -523,7 +523,10 @@ try processConfig(); - SCOPE_EXIT({ cleanup(); }); + if (!is_background) + { + SCOPE_EXIT({ cleanup(); }); + } initTTYBuffer(toProgressOption(getClientConfiguration().getString("progress", "default"))); ASTAlterCommand::setFormatAlterCommandsWithParentheses(true); @@ -560,7 +563,11 @@ try #if defined(FUZZING_MODE) runLibFuzzer(); #else - if (is_interactive && !delayed_interactive) + if (is_background) + { + runBackground(); + } + else if (is_interactive && !delayed_interactive) { runInteractive(); } @@ -1150,6 +1157,25 @@ std::unique_ptr pyEntryClickHouseLocal(int argc, char ** argv) } } +int bgClickHouseLocal(int argc, char ** argv) +{ + try + { + DB::LocalServer app; + app.is_background = true; // Set background mode + app.init(argc, argv); + return app.run(); + } + catch (const DB::Exception & e) + { + throw std::domain_error(DB::getExceptionMessage(e, false)); + } + catch (...) + { + throw std::domain_error(DB::getCurrentExceptionMessage(true)); + } +} + // todo fix the memory leak and unnecessary copy local_result * query_stable(int argc, char ** argv) { @@ -1240,6 +1266,127 @@ void free_result_v2(local_result_v2 * result) delete result; } +std::mutex connection_mutex; + +chdb_conn * connect_chdb(int argc, char ** argv) +{ + try + { + std::lock_guard lock(connection_mutex); + + // Use background mode + int ret = bgClickHouseLocal(argc, argv); + if (ret != 0) + { + throw std::domain_error("Failed to run ClickHouse background server"); + } + + auto * conn = new chdb_conn(); + auto * server = new DB::LocalServer(); + conn->server = server; + server->is_background = true; + + // Initialize server in background mode + server->init(argc, argv); + conn->connected = true; + + return conn; + } + catch (const DB::Exception & e) + { + throw std::domain_error(DB::getExceptionMessage(e, false)); + } + catch (...) + { + throw std::domain_error(DB::getCurrentExceptionMessage(true)); + } +} + + +void close_conn(chdb_conn * conn) +{ + if (!conn) + return; + + std::lock_guard lock(connection_mutex); + + if (conn->connected) + { + DB::LocalServer * server = static_cast(conn->server); + // Cleanup suggestions before context + server->cleanup(); + delete server; + } + + delete conn; +} + +struct local_result_v2 * query_conn(chdb_conn * conn, const char * query, const char * format) +{ + if (!conn || !conn->connected) + return nullptr; + + std::lock_guard lock(connection_mutex); + + try + { + auto * result = new local_result_v2{}; + DB::LocalServer * server = static_cast(conn->server); + // Set output format if specified + if (format && *format) + { + server->client_context->setDefaultFormat(format); + } + + // Check connection and reconnect if needed + if (!conn->connected) + { + server->connect(); + } + + // Execute query + if (!server->processQueryText(query)) + { + std::string error = server->get_error_msg(); + result->error_message = new char[error.length() + 1]; + std::strcpy(result->error_message, error.c_str()); + return result; + } + + // Get query results without copying + auto * output_vec = server->getQueryOutputVector(); + if (output_vec && !output_vec->empty()) + { + // Take ownership of the vector + result->_vec = std::move(output_vec); + result->buf = output_vec->data(); + result->len = output_vec->size(); + } + + result->rows_read = server->getProcessedRows(); + result->bytes_read = server->getProcessedBytes(); + result->elapsed = server->getElapsedTime(); + + return result; + } + catch (const DB::Exception & e) + { + auto * result = new local_result_v2{}; + std::string error = DB::getExceptionMessage(e, false); + result->error_message = new char[error.length() + 1]; + std::strcpy(result->error_message, error.c_str()); + return result; + } + catch (...) + { + auto * result = new local_result_v2{}; + std::string error = DB::getCurrentExceptionMessage(true); + result->error_message = new char[error.length() + 1]; + std::strcpy(result->error_message, error.c_str()); + return result; + } +} + /** * The dummy_calls function is used to prevent certain functions from being optimized out by the compiler. * It includes calls to 'query_stable' and 'free_result' within a condition that is always false. diff --git a/programs/local/LocalServer.h b/programs/local/LocalServer.h index b18a7a90961..4488299ef97 100644 --- a/programs/local/LocalServer.h +++ b/programs/local/LocalServer.h @@ -30,11 +30,13 @@ class LocalServer : public ClientApplicationBase, public Loggers int main(const std::vector & /*args*/) override; -protected: - Poco::Util::LayeredConfiguration & getClientConfiguration() override; + void cleanup(); void connect() override; +protected: + Poco::Util::LayeredConfiguration & getClientConfiguration() override; + void processError(const String & query) const override; String getName() const override { return "local"; } @@ -60,7 +62,6 @@ class LocalServer : public ClientApplicationBase, public Loggers void tryInitPath(); void setupUsers(); - void cleanup(); void applyCmdOptions(ContextMutablePtr context); void applyCmdSettings(ContextMutablePtr context); diff --git a/programs/local/chdb.h b/programs/local/chdb.h index 1f4ae3835c4..01d924e0a68 100644 --- a/programs/local/chdb.h +++ b/programs/local/chdb.h @@ -50,6 +50,16 @@ CHDB_EXPORT void free_result(struct local_result * result); CHDB_EXPORT struct local_result_v2 * query_stable_v2(int argc, char ** argv); CHDB_EXPORT void free_result_v2(struct local_result_v2 * result); +struct chdb_conn +{ + void * server; // LocalServer * server; + bool connected; +}; + +CHDB_EXPORT chdb_conn * connect_chdb(int argc, char ** argv); +CHDB_EXPORT void close_conn(chdb_conn * conn); +CHDB_EXPORT struct local_result_v2 * query_conn(chdb_conn * conn, const char * query, const char * format); + #ifdef __cplusplus } #endif diff --git a/src/Client/ClientBase.cpp b/src/Client/ClientBase.cpp index ae0b526854c..441fe813620 100644 --- a/src/Client/ClientBase.cpp +++ b/src/Client/ClientBase.cpp @@ -2677,6 +2677,21 @@ void ClientBase::runInteractive() } +void ClientBase::runBackground() +{ + initQueryIdFormats(); + + // Initialize DateLUT here to avoid counting time spent here as query execution time. + (void)DateLUT::instance().getTimeZone(); + + if (home_path.empty()) + { + const char * home_path_cstr = getenv("HOME"); // NOLINT(concurrency-mt-unsafe) + if (home_path_cstr) + home_path = home_path_cstr; + } +} + bool ClientBase::processMultiQueryFromFile(const String & file_name) { String queries_from_file; diff --git a/src/Client/ClientBase.h b/src/Client/ClientBase.h index 53a0d142a13..2d026e13739 100644 --- a/src/Client/ClientBase.h +++ b/src/Client/ClientBase.h @@ -111,10 +111,17 @@ class ClientBase ASTPtr parseQuery(const char *& pos, const char * end, const Settings & settings, bool allow_multi_statements); + bool is_background = false; + + /// Returns true if query processing was successful. + bool processQueryText(const String & text); + protected: void runInteractive(); void runNonInteractive(); + void runBackground(); + char * argv0 = nullptr; void runLibFuzzer(); @@ -168,9 +175,6 @@ class ClientBase const std::vector & hosts_and_ports_arguments) = 0; virtual void processConfig() = 0; - /// Returns true if query processing was successful. - bool processQueryText(const String & text); - virtual void readArguments( int argc, char ** argv, @@ -264,7 +268,7 @@ class ClientBase ContextMutablePtr global_context; /// Client context is a context used only by the client to parse queries, process query parameters and to connect to clickhouse-server. - ContextMutablePtr client_context; + public: ContextMutablePtr client_context; bool is_interactive = false; /// Use either interactive line editing interface or batch mode. bool delayed_interactive = false; From d073583eab96138774b581f9a2db59be1dbc9e95 Mon Sep 17 00:00:00 2001 From: auxten Date: Sat, 2 Nov 2024 10:48:17 +0000 Subject: [PATCH 03/36] Update .clang-tidy --- .clang-tidy | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/.clang-tidy b/.clang-tidy index 219ac263ab3..d0e7b7d1371 100644 --- a/.clang-tidy +++ b/.clang-tidy @@ -27,6 +27,8 @@ Checks: [ '-bugprone-not-null-terminated-result', '-bugprone-reserved-identifier', # useful but too slow, TODO retry when https://reviews.llvm.org/rG1c282052624f9d0bd273bde0b47b30c96699c6c7 is merged '-bugprone-unchecked-optional-access', + '-bugprone-crtp-constructor-accessibility', + '-bugprone-suspicious-stringview-data-usage', '-cert-dcl16-c', '-cert-dcl37-c', @@ -36,6 +38,7 @@ Checks: [ '-cert-msc51-cpp', '-cert-oop54-cpp', '-cert-oop57-cpp', + '-cert-err33-c', # Misreports on clang-19: it warns about all functions containing 'remove' in the name, not only about the standard library. '-clang-analyzer-optin.performance.Padding', @@ -94,10 +97,12 @@ Checks: [ '-modernize-pass-by-value', '-modernize-return-braced-init-list', '-modernize-use-auto', + '-modernize-use-constraints', # This is a good check, but clang-tidy crashes, see https://github.com/llvm/llvm-project/issues/91872 '-modernize-use-default-member-init', '-modernize-use-emplace', '-modernize-use-nodiscard', '-modernize-use-trailing-return-type', + '-modernize-use-designated-initializers', '-performance-enum-size', '-performance-inefficient-string-concatenation', @@ -121,7 +126,8 @@ Checks: [ '-readability-magic-numbers', '-readability-named-parameter', '-readability-redundant-declaration', - '-readability-redundant-inline-specifier', + '-readability-redundant-inline-specifier', # useful but incompatible with __attribute((always_inline))__ (aka. ALWAYS_INLINE, base/base/defines.h). + # ALWAYS_INLINE only has an effect if combined with `inline`: https://godbolt.org/z/Eefd74qdM '-readability-redundant-member-init', # Useful but triggers another problem. Imagine a struct S with multiple String members. Structs are often instantiated via designated # initializer S s{.s1 = [...], .s2 = [...], [...]}. In this case, compiler warning `missing-field-initializers` requires to specify all members which are not in-struct # initialized (example: s1 in struct S { String s1; String s2{};}; is not in-struct initialized, therefore it must be specified at instantiation time). As explicitly @@ -132,12 +138,7 @@ Checks: [ '-readability-uppercase-literal-suffix', '-readability-use-anyofallof', - '-zircon-*', - - # This is a good check, but clang-tidy crashes, see https://github.com/llvm/llvm-project/issues/91872 - '-modernize-use-constraints', - # https://github.com/abseil/abseil-cpp/issues/1667 - '-clang-analyzer-optin.core.EnumCastOutOfRange' + '-zircon-*' ] WarningsAsErrors: '*' @@ -172,4 +173,4 @@ CheckOptions: performance-move-const-arg.CheckTriviallyCopyableMove: false # Workaround clang-tidy bug: https://github.com/llvm/llvm-project/issues/46097 readability-identifier-naming.TypeTemplateParameterIgnoredRegexp: expr-type - cppcoreguidelines-avoid-do-while.IgnoreMacros: true + cppcoreguidelines-avoid-do-while.IgnoreMacros: true \ No newline at end of file From 61b875ca60b973c21b896e35c467563ffc6500f8 Mon Sep 17 00:00:00 2001 From: auxten Date: Sat, 2 Nov 2024 10:54:44 +0000 Subject: [PATCH 04/36] Basically works --- programs/local/LocalChdb.cpp | 91 ++++++++++++++++++++++++++-------- programs/local/LocalServer.cpp | 26 ++++------ 2 files changed, 80 insertions(+), 37 deletions(-) diff --git a/programs/local/LocalChdb.cpp b/programs/local/LocalChdb.cpp index c6e7bf72a82..ecc35f622a2 100644 --- a/programs/local/LocalChdb.cpp +++ b/programs/local/LocalChdb.cpp @@ -2,9 +2,9 @@ #if USE_PYTHON -#include -#include -#include +# include +# include +# include extern bool inside_main = true; @@ -53,6 +53,7 @@ local_result_v2 * queryToBuffer( // Convert std::string to char* std::vector argv_char; + argv_char.reserve(argv.size()); for (auto & arg : argv) argv_char.push_back(const_cast(arg.c_str())); @@ -77,13 +78,49 @@ memoryview_wrapper * query_result::get_memview() return new memoryview_wrapper(this->result_wrapper); } -#ifdef PY_TEST_MAIN -# include -# include -# include -# include -# include -# include +// Add these new wrapper classes +class connection_wrapper +{ +private: + chdb_conn * conn; + +public: + connection_wrapper(int argc, char ** argv) + { + conn = connect_chdb(argc, argv); + if (!conn) + { + throw std::runtime_error("Failed to connect to chdb"); + } + } + + ~connection_wrapper() + { + if (conn) + { + close_conn(conn); + conn = nullptr; + } + } + + query_result * query(const std::string & query_str, const std::string & format = "CSV") + { + auto * result = query_conn(conn, query_str.c_str(), format.c_str()); + if (!result) + { + throw std::runtime_error("Query failed"); + } + return new query_result(result); + } +}; + +# ifdef PY_TEST_MAIN +# include +# include +# include +# include +# include +# include std::shared_ptr queryToArrow(const std::string & queryStr) @@ -124,7 +161,7 @@ int main() return 0; } -#else +# else PYBIND11_MODULE(_chdb, m) { m.doc() = "chDB module for query function"; @@ -182,6 +219,26 @@ PYBIND11_MODULE(_chdb, m) "Returns:\n" " List[str, str]: List of column name and type pairs."); + py::class_(m, "Connection") + .def( + py::init( + [](const std::string & path) + { + std::vector argv = {"clickhouse", "--path=" + path}; + std::vector argv_char; + argv_char.reserve(argv.size()); + for (auto & arg : argv) + argv_char.push_back(const_cast(arg.c_str())); + return new connection_wrapper(argv_char.size(), argv_char.data()); + }), + py::arg("path") = "") + .def( + "query", + &connection_wrapper::query, + py::arg("query_str"), + py::arg("format") = "CSV", + "Execute a query and return a query_result object"); + m.def( "query", &query, @@ -191,17 +248,7 @@ PYBIND11_MODULE(_chdb, m) py::arg("path") = "", py::arg("udf_path") = "", "Query chDB and return a query_result object"); - - // Pybind wrapper for: - // struct chdb_conn - // { - // void * server; // LocalServer * server; - // bool connected; - // }; - // CHDB_EXPORT chdb_conn * connect_chdb(int argc, char ** argv); - // CHDB_EXPORT void close_conn(chdb_conn * conn); - // CHDB_EXPORT struct local_result_v2 * query_conn(chdb_conn * conn, const char * query, const char * format); } -#endif // PY_TEST_MAIN +# endif // PY_TEST_MAIN #endif // USE_PYTHON diff --git a/programs/local/LocalServer.cpp b/programs/local/LocalServer.cpp index 20181538ee9..3e8a48d85c4 100644 --- a/programs/local/LocalServer.cpp +++ b/programs/local/LocalServer.cpp @@ -1157,14 +1157,19 @@ std::unique_ptr pyEntryClickHouseLocal(int argc, char ** argv) } } -int bgClickHouseLocal(int argc, char ** argv) +DB::LocalServer * bgClickHouseLocal(int argc, char ** argv) { try { - DB::LocalServer app; - app.is_background = true; // Set background mode - app.init(argc, argv); - return app.run(); + auto * app = new DB::LocalServer(); + app->is_background = true; // Set background mode + app->init(argc, argv); + int ret = app->run(); + if (ret != 0) + { + throw std::domain_error(app->get_error_msg()); + } + return app; } catch (const DB::Exception & e) { @@ -1275,19 +1280,10 @@ chdb_conn * connect_chdb(int argc, char ** argv) std::lock_guard lock(connection_mutex); // Use background mode - int ret = bgClickHouseLocal(argc, argv); - if (ret != 0) - { - throw std::domain_error("Failed to run ClickHouse background server"); - } + DB::LocalServer * server = bgClickHouseLocal(argc, argv); auto * conn = new chdb_conn(); - auto * server = new DB::LocalServer(); conn->server = server; - server->is_background = true; - - // Initialize server in background mode - server->init(argc, argv); conn->connected = true; return conn; From 6ca4dd6c88cb935922fb4e2db48fa4c5a8dcd4c3 Mon Sep 17 00:00:00 2001 From: auxten Date: Sat, 2 Nov 2024 11:57:06 +0000 Subject: [PATCH 05/36] Fix SCOPE_EXIT --- programs/local/LocalServer.cpp | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/programs/local/LocalServer.cpp b/programs/local/LocalServer.cpp index 3e8a48d85c4..8dacba1dea4 100644 --- a/programs/local/LocalServer.cpp +++ b/programs/local/LocalServer.cpp @@ -523,10 +523,7 @@ try processConfig(); - if (!is_background) - { - SCOPE_EXIT({ cleanup(); }); - } + SCOPE_EXIT({ if (!is_background) cleanup(); }); initTTYBuffer(toProgressOption(getClientConfiguration().getString("progress", "default"))); ASTAlterCommand::setFormatAlterCommandsWithParentheses(true); From 7f21a4b697c1c7bc23afa33149b342918002bac0 Mon Sep 17 00:00:00 2001 From: auxten Date: Mon, 4 Nov 2024 07:50:21 +0000 Subject: [PATCH 06/36] Fix output format --- programs/local/LocalChdb.cpp | 2 +- programs/local/LocalServer.cpp | 1 + src/Client/ClientBase.cpp | 6 ++++++ src/Client/ClientBase.h | 1 + 4 files changed, 9 insertions(+), 1 deletion(-) diff --git a/programs/local/LocalChdb.cpp b/programs/local/LocalChdb.cpp index ecc35f622a2..e4b556ec357 100644 --- a/programs/local/LocalChdb.cpp +++ b/programs/local/LocalChdb.cpp @@ -219,7 +219,7 @@ PYBIND11_MODULE(_chdb, m) "Returns:\n" " List[str, str]: List of column name and type pairs."); - py::class_(m, "Connection") + py::class_(m, "connect") .def( py::init( [](const std::string & path) diff --git a/programs/local/LocalServer.cpp b/programs/local/LocalServer.cpp index 8dacba1dea4..e990bc009ec 100644 --- a/programs/local/LocalServer.cpp +++ b/programs/local/LocalServer.cpp @@ -1329,6 +1329,7 @@ struct local_result_v2 * query_conn(chdb_conn * conn, const char * query, const if (format && *format) { server->client_context->setDefaultFormat(format); + server->setDefaultFormat(format); } // Check connection and reconnect if needed diff --git a/src/Client/ClientBase.cpp b/src/Client/ClientBase.cpp index 441fe813620..2e7ebc6346d 100644 --- a/src/Client/ClientBase.cpp +++ b/src/Client/ClientBase.cpp @@ -707,6 +707,12 @@ bool ClientBase::isRegularFile(int fd) return fstat(fd, &file_stat) == 0 && S_ISREG(file_stat.st_mode); } +void ClientBase::setDefaultFormat(const String & format) +{ + default_output_format = format; + is_default_format = false; +} + void ClientBase::setDefaultFormatsAndCompressionFromConfiguration() { if (getClientConfiguration().has("output-format")) diff --git a/src/Client/ClientBase.h b/src/Client/ClientBase.h index 2d026e13739..073c2c29f4e 100644 --- a/src/Client/ClientBase.h +++ b/src/Client/ClientBase.h @@ -108,6 +108,7 @@ class ClientBase size_t getProcessedBytes() const { return processed_bytes; } double getElapsedTime() const { return progress_indication.elapsedSeconds(); } std::string get_error_msg() const { return error_message_oss.str(); } + void setDefaultFormat(const String & format); ASTPtr parseQuery(const char *& pos, const char * end, const Settings & settings, bool allow_multi_statements); From d82a1a7952088e67deac8afa09b8a472878d2fcb Mon Sep 17 00:00:00 2001 From: auxten Date: Mon, 4 Nov 2024 08:47:51 +0000 Subject: [PATCH 07/36] Fix chdb.h decl --- programs/local/chdb.h | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/programs/local/chdb.h b/programs/local/chdb.h index 01d924e0a68..642fa42ccb1 100644 --- a/programs/local/chdb.h +++ b/programs/local/chdb.h @@ -5,6 +5,7 @@ # include extern "C" { #else +# include # include # include #endif @@ -56,9 +57,9 @@ struct chdb_conn bool connected; }; -CHDB_EXPORT chdb_conn * connect_chdb(int argc, char ** argv); -CHDB_EXPORT void close_conn(chdb_conn * conn); -CHDB_EXPORT struct local_result_v2 * query_conn(chdb_conn * conn, const char * query, const char * format); +CHDB_EXPORT struct chdb_conn * connect_chdb(int argc, char ** argv); +CHDB_EXPORT void close_conn(struct chdb_conn * conn); +CHDB_EXPORT struct local_result_v2 * query_conn(struct chdb_conn * conn, const char * query, const char * format); #ifdef __cplusplus } From 744fa133a09eb14f628f00167535d8432caa969e Mon Sep 17 00:00:00 2001 From: auxten Date: Mon, 4 Nov 2024 08:48:11 +0000 Subject: [PATCH 08/36] Minimal changes on ClientBase --- programs/local/LocalServer.cpp | 16 ++-------------- src/Client/ClientBase.cpp | 17 +++++++++++++++++ src/Client/ClientBase.h | 13 +++++++------ 3 files changed, 26 insertions(+), 20 deletions(-) diff --git a/programs/local/LocalServer.cpp b/programs/local/LocalServer.cpp index e990bc009ec..2a273e29b9b 100644 --- a/programs/local/LocalServer.cpp +++ b/programs/local/LocalServer.cpp @@ -1159,7 +1159,7 @@ DB::LocalServer * bgClickHouseLocal(int argc, char ** argv) try { auto * app = new DB::LocalServer(); - app->is_background = true; // Set background mode + app->setBackground(true); app->init(argc, argv); int ret = app->run(); if (ret != 0) @@ -1325,21 +1325,9 @@ struct local_result_v2 * query_conn(chdb_conn * conn, const char * query, const { auto * result = new local_result_v2{}; DB::LocalServer * server = static_cast(conn->server); - // Set output format if specified - if (format && *format) - { - server->client_context->setDefaultFormat(format); - server->setDefaultFormat(format); - } - - // Check connection and reconnect if needed - if (!conn->connected) - { - server->connect(); - } // Execute query - if (!server->processQueryText(query)) + if (!server->parseQueryTextWithOutputFormat(query, format)) { std::string error = server->get_error_msg(); result->error_message = new char[error.length() + 1]; diff --git a/src/Client/ClientBase.cpp b/src/Client/ClientBase.cpp index 2e7ebc6346d..fc6fb1e8c39 100644 --- a/src/Client/ClientBase.cpp +++ b/src/Client/ClientBase.cpp @@ -2425,6 +2425,23 @@ bool ClientBase::processQueryText(const String & text) return executeMultiQuery(text); } +bool ClientBase::parseQueryTextWithOutputFormat(const String & query, const String & format) +{ + // Set output format if specified + if (!format.empty()) + { + client_context->setDefaultFormat(format); + setDefaultFormat(format); + } + + // Check connection and reconnect if needed + if (!connection->checkConnected(connection_parameters.timeouts)) + connect(); + + // Execute query + return processQueryText(query); +} + String ClientBase::prompt() const { diff --git a/src/Client/ClientBase.h b/src/Client/ClientBase.h index 073c2c29f4e..f7a786d497a 100644 --- a/src/Client/ClientBase.h +++ b/src/Client/ClientBase.h @@ -109,18 +109,16 @@ class ClientBase double getElapsedTime() const { return progress_indication.elapsedSeconds(); } std::string get_error_msg() const { return error_message_oss.str(); } void setDefaultFormat(const String & format); + void setBackground(bool is_background_) { is_background = is_background_; } + bool parseQueryTextWithOutputFormat(const String & query, const String & format); ASTPtr parseQuery(const char *& pos, const char * end, const Settings & settings, bool allow_multi_statements); - bool is_background = false; - - /// Returns true if query processing was successful. - bool processQueryText(const String & text); - protected: void runInteractive(); void runNonInteractive(); + bool is_background = false; void runBackground(); char * argv0 = nullptr; @@ -176,6 +174,9 @@ class ClientBase const std::vector & hosts_and_ports_arguments) = 0; virtual void processConfig() = 0; + /// Returns true if query processing was successful. + bool processQueryText(const String & text); + virtual void readArguments( int argc, char ** argv, @@ -269,7 +270,7 @@ class ClientBase ContextMutablePtr global_context; /// Client context is a context used only by the client to parse queries, process query parameters and to connect to clickhouse-server. - public: ContextMutablePtr client_context; + ContextMutablePtr client_context; bool is_interactive = false; /// Use either interactive line editing interface or batch mode. bool delayed_interactive = false; From aa946384e7830cba53e9f8c23e998422761fd6f4 Mon Sep 17 00:00:00 2001 From: auxten Date: Mon, 4 Nov 2024 12:48:11 +0000 Subject: [PATCH 09/36] Fix get_error_msg name to getErrorMsg --- programs/local/LocalServer.cpp | 6 +++--- src/Client/ClientBase.h | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/programs/local/LocalServer.cpp b/programs/local/LocalServer.cpp index 2a273e29b9b..2222490fa42 100644 --- a/programs/local/LocalServer.cpp +++ b/programs/local/LocalServer.cpp @@ -1136,7 +1136,7 @@ std::unique_ptr pyEntryClickHouseLocal(int argc, char ** argv) app.getProcessedBytes(), app.getElapsedTime()); } else { - return std::make_unique(app.get_error_msg()); + return std::make_unique(app.getErrorMsg()); } } catch (const DB::Exception & e) @@ -1164,7 +1164,7 @@ DB::LocalServer * bgClickHouseLocal(int argc, char ** argv) int ret = app->run(); if (ret != 0) { - throw std::domain_error(app->get_error_msg()); + throw std::domain_error(app->getErrorMsg()); } return app; } @@ -1329,7 +1329,7 @@ struct local_result_v2 * query_conn(chdb_conn * conn, const char * query, const // Execute query if (!server->parseQueryTextWithOutputFormat(query, format)) { - std::string error = server->get_error_msg(); + std::string error = server->getErrorMsg(); result->error_message = new char[error.length() + 1]; std::strcpy(result->error_message, error.c_str()); return result; diff --git a/src/Client/ClientBase.h b/src/Client/ClientBase.h index f7a786d497a..d1bd1d6388a 100644 --- a/src/Client/ClientBase.h +++ b/src/Client/ClientBase.h @@ -107,7 +107,7 @@ class ClientBase size_t getProcessedRows() const { return processed_rows; } size_t getProcessedBytes() const { return processed_bytes; } double getElapsedTime() const { return progress_indication.elapsedSeconds(); } - std::string get_error_msg() const { return error_message_oss.str(); } + std::string getErrorMsg() const { return error_message_oss.str(); } void setDefaultFormat(const String & format); void setBackground(bool is_background_) { is_background = is_background_; } bool parseQueryTextWithOutputFormat(const String & query, const String & format); From 68279089b661ddf8a5da5e4a163af9a6bc4e662f Mon Sep 17 00:00:00 2001 From: auxten Date: Thu, 7 Nov 2024 08:30:59 +0000 Subject: [PATCH 10/36] No exception on save default_database for now --- src/Interpreters/InterpreterUseQuery.cpp | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/Interpreters/InterpreterUseQuery.cpp b/src/Interpreters/InterpreterUseQuery.cpp index 3999b5a2fef..8068a87d122 100644 --- a/src/Interpreters/InterpreterUseQuery.cpp +++ b/src/Interpreters/InterpreterUseQuery.cpp @@ -33,10 +33,11 @@ BlockIO InterpreterUseQuery::execute() tmp_path_fs << new_database; tmp_path_fs.close(); } - else - { - throw Exception(ErrorCodes::CANNOT_OPEN_FILE, "Cannot open file {} for writing", default_database_path.string()); - } + //chdb todo: fix the following code on bgClickHouseLocal mode + // else + // { + // throw Exception(ErrorCodes::CANNOT_OPEN_FILE, "Cannot open file {} for writing", default_database_path.string()); + // } return {}; } From 7de1e76ec115252faa8dc4bce670187aeb902163 Mon Sep 17 00:00:00 2001 From: auxten Date: Thu, 7 Nov 2024 08:31:41 +0000 Subject: [PATCH 11/36] Add parquet(arrow) dep for local --- programs/local/CMakeLists.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/programs/local/CMakeLists.txt b/programs/local/CMakeLists.txt index 46605dccd20..053b2362386 100644 --- a/programs/local/CMakeLists.txt +++ b/programs/local/CMakeLists.txt @@ -57,6 +57,7 @@ set (CLICKHOUSE_LOCAL_LINK clickhouse_parsers clickhouse_storages_system clickhouse_table_functions + ch_contrib::parquet ) clickhouse_program_add(local) From 5d578b88a6191f614f0b0cd955d944691b965cc1 Mon Sep 17 00:00:00 2001 From: auxten Date: Thu, 7 Nov 2024 08:32:15 +0000 Subject: [PATCH 12/36] Use ArrowStream in cursor mode --- programs/local/LocalChdb.cpp | 296 +++++++++++++++++++++++++++++---- programs/local/LocalChdb.h | 80 +++++++-- programs/local/LocalServer.cpp | 2 +- 3 files changed, 338 insertions(+), 40 deletions(-) diff --git a/programs/local/LocalChdb.cpp b/programs/local/LocalChdb.cpp index e4b556ec357..2f2a5891e69 100644 --- a/programs/local/LocalChdb.cpp +++ b/programs/local/LocalChdb.cpp @@ -2,11 +2,11 @@ #if USE_PYTHON -# include # include -# include +namespace py = pybind11; + extern bool inside_main = true; @@ -78,41 +78,267 @@ memoryview_wrapper * query_result::get_memview() return new memoryview_wrapper(this->result_wrapper); } -// Add these new wrapper classes -class connection_wrapper + +// Parse SQLite-style connection string +std::pair> connection_wrapper::parse_connection_string(const std::string & conn_str) { -private: - chdb_conn * conn; + std::string path; + std::map params; + + if (conn_str == ":memory:") + { + return {":memory:", params}; + } + + std::string working_str = conn_str; -public: - connection_wrapper(int argc, char ** argv) + // Handle file: prefix + if (working_str.starts_with("file:")) { - conn = connect_chdb(argc, argv); - if (!conn) + working_str = working_str.substr(5); + + // Handle triple slash for absolute paths + if (working_str.starts_with("///")) { - throw std::runtime_error("Failed to connect to chdb"); + working_str = working_str.substr(2); // Remove two slashes, keep one } } - ~connection_wrapper() + // Split path and parameters + auto query_pos = working_str.find('?'); + if (query_pos != std::string::npos) { - if (conn) + path = working_str.substr(0, query_pos); + std::string query = working_str.substr(query_pos + 1); + + // Parse parameters + std::istringstream params_stream(query); + std::string param; + while (std::getline(params_stream, param, '&')) { - close_conn(conn); - conn = nullptr; + auto eq_pos = param.find('='); + if (eq_pos != std::string::npos) + { + std::string key = param.substr(0, eq_pos); + std::string value = param.substr(eq_pos + 1); + params[key] = value; + } } } + else + { + path = working_str; + } - query_result * query(const std::string & query_str, const std::string & format = "CSV") + // Convert relative paths to absolute + if (!path.empty() && path[0] != '/') { - auto * result = query_conn(conn, query_str.c_str(), format.c_str()); - if (!result) + std::error_code ec; + path = std::filesystem::absolute(path, ec).string(); + if (ec) { - throw std::runtime_error("Query failed"); + throw std::runtime_error("Failed to resolve path: " + path); } - return new query_result(result); } -}; + + return {path, params}; +} + +std::vector +connection_wrapper::build_clickhouse_args(const std::string & path, const std::map & params) +{ + std::vector argv = {"clickhouse"}; + + if (path != ":memory:") + { + argv.push_back("--path=" + path); + } + + // Map SQLite parameters to ClickHouse arguments + for (const auto & [key, value] : params) + { + if (key == "mode") + { + if (value == "ro") + { + argv.push_back("--readonly=1"); + } + } + } + + return argv; +} + +void connection_wrapper::initialize_database() +{ + if (is_memory_db) + { + // Setup memory engine + query_result * ret = query("CREATE DATABASE IF NOT EXISTS default ENGINE = Memory; USE default"); + if (ret->has_error()) + { + auto err_msg = fmt::format("Failed to create memory database: {}", std::string(ret->error_message())); + delete ret; + throw std::runtime_error(err_msg); + } + } + else + { + // Create directory if it doesn't exist + std::filesystem::create_directories(db_path); + // Setup Atomic database + query_result * ret = query("CREATE DATABASE IF NOT EXISTS default ENGINE = Atomic; USE default"); + if (ret->has_error()) + { + auto err_msg = fmt::format("Failed to create database: {}", std::string(ret->error_message())); + delete ret; + throw std::runtime_error(err_msg); + } + } +} + +connection_wrapper::connection_wrapper(int argc, char ** argv) +{ + conn = connect_chdb(argc, argv); + if (!conn) + { + throw std::runtime_error("Failed to connect to chdb"); + } +} + +connection_wrapper::connection_wrapper(const std::string & conn_str) +{ + auto [path, params] = parse_connection_string(conn_str); + + db_path = path; + is_memory_db = (path == ":memory:"); + + auto argv = build_clickhouse_args(path, params); + + // Convert to char* array + std::vector argv_char; + argv_char.reserve(argv.size()); + for (auto & arg : argv) + { + argv_char.push_back(const_cast(arg.c_str())); + } + + conn = connect_chdb(argv_char.size(), argv_char.data()); + if (!conn) + { + throw std::runtime_error("Failed to connect to chdb"); + } + + initialize_database(); +} + +connection_wrapper::~connection_wrapper() +{ + if (conn) + { + close_conn(conn); + conn = nullptr; + } +} + +cursor_wrapper * connection_wrapper::cursor() +{ + return new cursor_wrapper(this); +} + +void connection_wrapper::commit() +{ + // do nothing +} + +void connection_wrapper::close() +{ + if (conn) + { + close_conn(conn); + conn = nullptr; + } +} + +query_result * connection_wrapper::query(const std::string & query_str, const std::string & format) +{ + return new query_result(query_conn(conn, query_str.c_str(), format.c_str())); +} + +void cursor_wrapper::execute(const std::string & query_str) +{ + if (current_result) + { + delete current_result; + current_result = nullptr; + } + current_row = 0; + + // Always use Arrow format internally + current_result = conn->query(query_str, "ArrowStream"); + + if (current_result->has_error()) + { + throw std::runtime_error(current_result->error_message()); + } + + if (current_result->data() == nullptr || current_result->size() == 0) + { + return; + } + // Convert Arrow buffer to Table with RecordBatchStreamReader + auto input_stream = std::make_shared(std::string_view(current_result->data(), current_result->size())); + auto batch_reader = arrow::ipc::RecordBatchStreamReader::Open(input_stream).ValueOrDie(); + std::vector> batches; + std::shared_ptr batch; + while (batch_reader->ReadNext(&batch).ok() && batch != nullptr) + { + batches.push_back(batch); + } + current_table = arrow::Table::FromRecordBatches(batches).ValueOrDie(); +} + +py::object cursor_wrapper::fetchone() +{ + if (!current_table || current_row >= current_table->num_rows()) + { + return py::none(); + } + + py::tuple row(current_table->num_columns()); + for (int i = 0; i < current_table->num_columns(); i++) + { + auto column = current_table->column(i); + // Convert Arrow column value to Python object based on type + // This is a simplified example + row[i] = py::cast(column->GetScalar(current_row).ValueOrDie()->ToString()); + } + current_row++; + return row; +} + +py::list cursor_wrapper::fetchall() +{ + py::list results; + while (true) + { + py::object row = fetchone(); + if (row.is_none()) + break; + results.append(row); + } + return results; +} + +py::object cursor_wrapper::__next__() +{ + py::object row = fetchone(); + if (row.is_none()) + { + throw py::stop_iteration(); + } + return row; +} # ifdef PY_TEST_MAIN # include @@ -219,19 +445,33 @@ PYBIND11_MODULE(_chdb, m) "Returns:\n" " List[str, str]: List of column name and type pairs."); + py::class_(m, "cursor") + .def(py::init()) + .def("execute", &cursor_wrapper::execute) + .def("fetchone", &cursor_wrapper::fetchone) + .def("fetchall", &cursor_wrapper::fetchall) + .def("__iter__", &cursor_wrapper::__iter__) + .def("__next__", &cursor_wrapper::__next__); + py::class_(m, "connect") .def( py::init( [](const std::string & path) { - std::vector argv = {"clickhouse", "--path=" + path}; - std::vector argv_char; - argv_char.reserve(argv.size()); - for (auto & arg : argv) - argv_char.push_back(const_cast(arg.c_str())); - return new connection_wrapper(argv_char.size(), argv_char.data()); + try + { + return new connection_wrapper(path); + } + catch (const std::exception & e) + { + throw py::error_already_set(); + } }), - py::arg("path") = "") + py::arg("path") = ":memory:") + .def("cursor", &connection_wrapper::cursor) + .def("execute", &connection_wrapper::query) + .def("commit", &connection_wrapper::commit) + .def("close", &connection_wrapper::close) .def( "query", &connection_wrapper::query, diff --git a/programs/local/LocalChdb.h b/programs/local/LocalChdb.h index 6401c04f03b..f7a31b02473 100644 --- a/programs/local/LocalChdb.h +++ b/programs/local/LocalChdb.h @@ -1,20 +1,52 @@ #pragma once +#include #include "config.h" #if USE_PYTHON -#include "chdb.h" -#include -#include -#include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include +# include "chdb.h" namespace py = pybind11; -class local_result_wrapper; +class __attribute__((visibility("default"))) local_result_wrapper; +class __attribute__((visibility("default"))) connection_wrapper; +class __attribute__((visibility("default"))) cursor_wrapper; class __attribute__((visibility("default"))) memoryview_wrapper; class __attribute__((visibility("default"))) query_result; +class connection_wrapper +{ +private: + chdb_conn * conn; + std::string db_path; + bool is_memory_db; + +public: + connection_wrapper(int argc, char ** argv); + explicit connection_wrapper(const std::string & conn_str); + ~connection_wrapper(); + cursor_wrapper * cursor(); + void commit(); + void close(); + query_result * query(const std::string & query_str, const std::string & format = "CSV"); + + // Move the private methods declarations here + std::pair> parse_connection_string(const std::string & conn_str); + std::vector build_clickhouse_args(const std::string & path, const std::map & params); + void initialize_database(); +}; class local_result_wrapper { @@ -23,10 +55,7 @@ class local_result_wrapper public: local_result_wrapper(local_result_v2 * result) : result(result) { } - ~local_result_wrapper() - { - free_result_v2(result); - } + ~local_result_wrapper() { free_result_v2(result); } char * data() { if (result == nullptr) @@ -128,11 +157,11 @@ class memoryview_wrapper std::shared_ptr result_wrapper; public: - memoryview_wrapper(std::shared_ptr result) : result_wrapper(result) + explicit memoryview_wrapper(std::shared_ptr result) : result_wrapper(result) { // std::cerr << "memoryview_wrapper::memoryview_wrapper" << this->result->bytes() << std::endl; } - ~memoryview_wrapper() { } + ~memoryview_wrapper() = default; size_t size() { @@ -159,4 +188,33 @@ class memoryview_wrapper } } }; + +class cursor_wrapper +{ +private: + connection_wrapper * conn; + query_result * current_result; + size_t current_row; + std::shared_ptr current_table; + +public: + explicit cursor_wrapper(connection_wrapper * connection) + : conn(connection), current_result(nullptr), current_row(0), current_table(nullptr) + { + } + + ~cursor_wrapper() { delete current_result; } + + void execute(const std::string & query_str); + + py::object fetchone(); + + py::list fetchall(); + + // Support iteration + bool __iter__(py::object & self) { return true; } + py::object __next__(); +}; + + #endif diff --git a/programs/local/LocalServer.cpp b/programs/local/LocalServer.cpp index 2222490fa42..29641af55b6 100644 --- a/programs/local/LocalServer.cpp +++ b/programs/local/LocalServer.cpp @@ -1317,7 +1317,7 @@ void close_conn(chdb_conn * conn) struct local_result_v2 * query_conn(chdb_conn * conn, const char * query, const char * format) { if (!conn || !conn->connected) - return nullptr; + return new local_result_v2{}; std::lock_guard lock(connection_mutex); From 6a1c0d8457b91e4f7d1c3f3b3399d154175a2893 Mon Sep 17 00:00:00 2001 From: auxten Date: Mon, 11 Nov 2024 10:25:06 +0000 Subject: [PATCH 13/36] Handle result vec in CH loop --- programs/local/LocalChdb.cpp | 86 +++++++--------------------------- programs/local/LocalChdb.h | 78 +++++++++++++++++++++++++----- programs/local/LocalServer.cpp | 7 ++- 3 files changed, 86 insertions(+), 85 deletions(-) diff --git a/programs/local/LocalChdb.cpp b/programs/local/LocalChdb.cpp index 2f2a5891e69..eab8af7df40 100644 --- a/programs/local/LocalChdb.cpp +++ b/programs/local/LocalChdb.cpp @@ -1,4 +1,5 @@ #include "LocalChdb.h" +#include "chdb.h" #if USE_PYTHON @@ -269,76 +270,22 @@ void cursor_wrapper::execute(const std::string & query_str) { if (current_result) { - delete current_result; + // The free_result_v2 vector is managed by the ClickHouse Engine + // As we don't want to copy the data, so just release the memory here. + // The memory will be released when the ClientBase.query_result_buf is reassigned. + if (current_result->_vec) + { + current_result->_vec = nullptr; + } + free_result_v2(current_result); + current_result = nullptr; } - current_row = 0; // Always use Arrow format internally - current_result = conn->query(query_str, "ArrowStream"); - - if (current_result->has_error()) - { - throw std::runtime_error(current_result->error_message()); - } - - if (current_result->data() == nullptr || current_result->size() == 0) - { - return; - } - // Convert Arrow buffer to Table with RecordBatchStreamReader - auto input_stream = std::make_shared(std::string_view(current_result->data(), current_result->size())); - auto batch_reader = arrow::ipc::RecordBatchStreamReader::Open(input_stream).ValueOrDie(); - std::vector> batches; - std::shared_ptr batch; - while (batch_reader->ReadNext(&batch).ok() && batch != nullptr) - { - batches.push_back(batch); - } - current_table = arrow::Table::FromRecordBatches(batches).ValueOrDie(); -} - -py::object cursor_wrapper::fetchone() -{ - if (!current_table || current_row >= current_table->num_rows()) - { - return py::none(); - } - - py::tuple row(current_table->num_columns()); - for (int i = 0; i < current_table->num_columns(); i++) - { - auto column = current_table->column(i); - // Convert Arrow column value to Python object based on type - // This is a simplified example - row[i] = py::cast(column->GetScalar(current_row).ValueOrDie()->ToString()); - } - current_row++; - return row; -} - -py::list cursor_wrapper::fetchall() -{ - py::list results; - while (true) - { - py::object row = fetchone(); - if (row.is_none()) - break; - results.append(row); - } - return results; + current_result = query_conn(conn->get_conn(), query_str.c_str(), "ArrowStream"); } -py::object cursor_wrapper::__next__() -{ - py::object row = fetchone(); - if (row.is_none()) - { - throw py::stop_iteration(); - } - return row; -} # ifdef PY_TEST_MAIN # include @@ -448,10 +395,13 @@ PYBIND11_MODULE(_chdb, m) py::class_(m, "cursor") .def(py::init()) .def("execute", &cursor_wrapper::execute) - .def("fetchone", &cursor_wrapper::fetchone) - .def("fetchall", &cursor_wrapper::fetchall) - .def("__iter__", &cursor_wrapper::__iter__) - .def("__next__", &cursor_wrapper::__next__); + .def("get_memview", &cursor_wrapper::get_memview) + .def("data_size", &cursor_wrapper::data_size) + .def("rows_read", &cursor_wrapper::rows_read) + .def("bytes_read", &cursor_wrapper::bytes_read) + .def("elapsed", &cursor_wrapper::elapsed) + .def("has_error", &cursor_wrapper::has_error) + .def("error_message", &cursor_wrapper::error_message); py::class_(m, "connect") .def( diff --git a/programs/local/LocalChdb.h b/programs/local/LocalChdb.h index f7a31b02473..46d16ae7f98 100644 --- a/programs/local/LocalChdb.h +++ b/programs/local/LocalChdb.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include "config.h" @@ -36,6 +37,7 @@ class connection_wrapper public: connection_wrapper(int argc, char ** argv); explicit connection_wrapper(const std::string & conn_str); + chdb_conn * get_conn() { return conn; } ~connection_wrapper(); cursor_wrapper * cursor(); void commit(); @@ -138,7 +140,7 @@ class query_result public: query_result(local_result_v2 * result) : result_wrapper(std::make_shared(result)) { } - ~query_result() { } + ~query_result() = default; char * data() { return result_wrapper->data(); } py::bytes bytes() { return result_wrapper->bytes(); } py::str str() { return result_wrapper->str(); } @@ -193,27 +195,77 @@ class cursor_wrapper { private: connection_wrapper * conn; - query_result * current_result; - size_t current_row; - std::shared_ptr current_table; + local_result_v2 * current_result; public: - explicit cursor_wrapper(connection_wrapper * connection) - : conn(connection), current_result(nullptr), current_row(0), current_table(nullptr) - { - } + explicit cursor_wrapper(connection_wrapper * connection) : conn(connection), current_result(nullptr) { } ~cursor_wrapper() { delete current_result; } void execute(const std::string & query_str); - py::object fetchone(); + py::memoryview get_memview() + { + if (current_result == nullptr) + { + return py::memoryview(py::memoryview::from_memory(nullptr, 0, true)); + } + return py::memoryview(py::memoryview::from_memory(current_result->buf, current_result->len, true)); + } + + size_t data_size() + { + if (current_result == nullptr) + { + return 0; + } + return current_result->len; + } - py::list fetchall(); + size_t rows_read() + { + if (current_result == nullptr) + { + return 0; + } + return current_result->rows_read; + } - // Support iteration - bool __iter__(py::object & self) { return true; } - py::object __next__(); + size_t bytes_read() + { + if (current_result == nullptr) + { + return 0; + } + return current_result->bytes_read; + } + + double elapsed() + { + if (current_result == nullptr) + { + return 0; + } + return current_result->elapsed; + } + + bool has_error() + { + if (current_result == nullptr) + { + return false; + } + return current_result->error_message != nullptr; + } + + py::str error_message() + { + if (has_error()) + { + return py::str(current_result->error_message); + } + return py::str(); + } }; diff --git a/programs/local/LocalServer.cpp b/programs/local/LocalServer.cpp index 29641af55b6..504a917798f 100644 --- a/programs/local/LocalServer.cpp +++ b/programs/local/LocalServer.cpp @@ -1316,14 +1316,15 @@ void close_conn(chdb_conn * conn) struct local_result_v2 * query_conn(chdb_conn * conn, const char * query, const char * format) { + auto * result = new local_result_v2{ nullptr, 0, nullptr, 0, 0, 0, nullptr }; + if (!conn || !conn->connected) - return new local_result_v2{}; + return result; std::lock_guard lock(connection_mutex); try { - auto * result = new local_result_v2{}; DB::LocalServer * server = static_cast(conn->server); // Execute query @@ -1353,7 +1354,6 @@ struct local_result_v2 * query_conn(chdb_conn * conn, const char * query, const } catch (const DB::Exception & e) { - auto * result = new local_result_v2{}; std::string error = DB::getExceptionMessage(e, false); result->error_message = new char[error.length() + 1]; std::strcpy(result->error_message, error.c_str()); @@ -1361,7 +1361,6 @@ struct local_result_v2 * query_conn(chdb_conn * conn, const char * query, const } catch (...) { - auto * result = new local_result_v2{}; std::string error = DB::getCurrentExceptionMessage(true); result->error_message = new char[error.length() + 1]; std::strcpy(result->error_message, error.c_str()); From b259bece014ef87b9d3438c284f87e5f7a23b923 Mon Sep 17 00:00:00 2001 From: auxten Date: Mon, 11 Nov 2024 13:24:03 +0000 Subject: [PATCH 14/36] Fix cursor_wrapper close --- programs/local/LocalChdb.cpp | 17 +++-------------- programs/local/LocalChdb.h | 21 ++++++++++++++++++++- 2 files changed, 23 insertions(+), 15 deletions(-) diff --git a/programs/local/LocalChdb.cpp b/programs/local/LocalChdb.cpp index eab8af7df40..005c36f94b2 100644 --- a/programs/local/LocalChdb.cpp +++ b/programs/local/LocalChdb.cpp @@ -146,7 +146,7 @@ std::pair> connection_wrapper::p } std::vector -connection_wrapper::build_clickhouse_args(const std::string & path, const std::map & params) +connection_wrapper::llbuild_clickhouse_args(const std::string & path, const std::map & params) { std::vector argv = {"clickhouse"}; @@ -268,19 +268,7 @@ query_result * connection_wrapper::query(const std::string & query_str, const st void cursor_wrapper::execute(const std::string & query_str) { - if (current_result) - { - // The free_result_v2 vector is managed by the ClickHouse Engine - // As we don't want to copy the data, so just release the memory here. - // The memory will be released when the ClientBase.query_result_buf is reassigned. - if (current_result->_vec) - { - current_result->_vec = nullptr; - } - free_result_v2(current_result); - - current_result = nullptr; - } + release_result(); // Always use Arrow format internally current_result = query_conn(conn->get_conn(), query_str.c_str(), "ArrowStream"); @@ -395,6 +383,7 @@ PYBIND11_MODULE(_chdb, m) py::class_(m, "cursor") .def(py::init()) .def("execute", &cursor_wrapper::execute) + .def("close", &cursor_wrapper::close) .def("get_memview", &cursor_wrapper::get_memview) .def("data_size", &cursor_wrapper::data_size) .def("rows_read", &cursor_wrapper::rows_read) diff --git a/programs/local/LocalChdb.h b/programs/local/LocalChdb.h index 46d16ae7f98..ad668d844f9 100644 --- a/programs/local/LocalChdb.h +++ b/programs/local/LocalChdb.h @@ -197,13 +197,32 @@ class cursor_wrapper connection_wrapper * conn; local_result_v2 * current_result; + void release_result() + { + if (current_result) + { + // The free_result_v2 vector is managed by the ClickHouse Engine + // As we don't want to copy the data, so just release the memory here. + // The memory will be released when the ClientBase.query_result_buf is reassigned. + if (current_result->_vec) + { + current_result->_vec = nullptr; + } + free_result_v2(current_result); + + current_result = nullptr; + } + } + public: explicit cursor_wrapper(connection_wrapper * connection) : conn(connection), current_result(nullptr) { } - ~cursor_wrapper() { delete current_result; } + ~cursor_wrapper() { release_result(); } void execute(const std::string & query_str); + void close() { release_result(); } + py::memoryview get_memview() { if (current_result == nullptr) From a4b8d9ce5ce84a2027c043437d7840f88b15e375 Mon Sep 17 00:00:00 2001 From: auxten Date: Tue, 12 Nov 2024 10:53:21 +0000 Subject: [PATCH 15/36] Fix exception handling --- programs/local/LocalChdb.cpp | 32 ++++++++------------------------ programs/local/LocalChdb.h | 6 ++++++ programs/local/LocalServer.cpp | 32 ++++++++++++-------------------- 3 files changed, 26 insertions(+), 44 deletions(-) diff --git a/programs/local/LocalChdb.cpp b/programs/local/LocalChdb.cpp index 005c36f94b2..2ba5a47c92d 100644 --- a/programs/local/LocalChdb.cpp +++ b/programs/local/LocalChdb.cpp @@ -146,7 +146,7 @@ std::pair> connection_wrapper::p } std::vector -connection_wrapper::llbuild_clickhouse_args(const std::string & path, const std::map & params) +connection_wrapper::build_clickhouse_args(const std::string & path, const std::map & params) { std::vector argv = {"clickhouse"}; @@ -162,6 +162,7 @@ connection_wrapper::llbuild_clickhouse_args(const std::string & path, const std: { if (value == "ro") { + is_readonly = true; argv.push_back("--readonly=1"); } } @@ -172,6 +173,10 @@ connection_wrapper::llbuild_clickhouse_args(const std::string & path, const std: void connection_wrapper::initialize_database() { + if (is_readonly) + { + return; + } if (is_memory_db) { // Setup memory engine @@ -201,10 +206,6 @@ void connection_wrapper::initialize_database() connection_wrapper::connection_wrapper(int argc, char ** argv) { conn = connect_chdb(argc, argv); - if (!conn) - { - throw std::runtime_error("Failed to connect to chdb"); - } } connection_wrapper::connection_wrapper(const std::string & conn_str) @@ -225,11 +226,6 @@ connection_wrapper::connection_wrapper(const std::string & conn_str) } conn = connect_chdb(argv_char.size(), argv_char.data()); - if (!conn) - { - throw std::runtime_error("Failed to connect to chdb"); - } - initialize_database(); } @@ -383,6 +379,7 @@ PYBIND11_MODULE(_chdb, m) py::class_(m, "cursor") .def(py::init()) .def("execute", &cursor_wrapper::execute) + .def("commit", &cursor_wrapper::commit) .def("close", &cursor_wrapper::close) .def("get_memview", &cursor_wrapper::get_memview) .def("data_size", &cursor_wrapper::data_size) @@ -393,20 +390,7 @@ PYBIND11_MODULE(_chdb, m) .def("error_message", &cursor_wrapper::error_message); py::class_(m, "connect") - .def( - py::init( - [](const std::string & path) - { - try - { - return new connection_wrapper(path); - } - catch (const std::exception & e) - { - throw py::error_already_set(); - } - }), - py::arg("path") = ":memory:") + .def(py::init([](const std::string & path) { return new connection_wrapper(path); }), py::arg("path") = ":memory:") .def("cursor", &connection_wrapper::cursor) .def("execute", &connection_wrapper::query) .def("commit", &connection_wrapper::commit) diff --git a/programs/local/LocalChdb.h b/programs/local/LocalChdb.h index ad668d844f9..2c99e659314 100644 --- a/programs/local/LocalChdb.h +++ b/programs/local/LocalChdb.h @@ -33,6 +33,7 @@ class connection_wrapper chdb_conn * conn; std::string db_path; bool is_memory_db; + bool is_readonly; public: connection_wrapper(int argc, char ** argv); @@ -221,6 +222,11 @@ class cursor_wrapper void execute(const std::string & query_str); + void commit() + { + // do nothing + } + void close() { release_result(); } py::memoryview get_memview() diff --git a/programs/local/LocalServer.cpp b/programs/local/LocalServer.cpp index 504a917798f..4c65cce108e 100644 --- a/programs/local/LocalServer.cpp +++ b/programs/local/LocalServer.cpp @@ -1164,13 +1164,16 @@ DB::LocalServer * bgClickHouseLocal(int argc, char ** argv) int ret = app->run(); if (ret != 0) { - throw std::domain_error(app->getErrorMsg()); + auto err_msg = app->getErrorMsg(); + LOG_ERROR(&app->logger(), "Error running bgClickHouseLocal: {}", err_msg); + delete app; + throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Error running bgClickHouseLocal: {}", err_msg); } return app; } catch (const DB::Exception & e) { - throw std::domain_error(DB::getExceptionMessage(e, false)); + throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "bgClickHouseLocal {}", DB::getExceptionMessage(e, false)); } catch (...) { @@ -1272,27 +1275,16 @@ std::mutex connection_mutex; chdb_conn * connect_chdb(int argc, char ** argv) { - try - { - std::lock_guard lock(connection_mutex); + std::lock_guard lock(connection_mutex); - // Use background mode - DB::LocalServer * server = bgClickHouseLocal(argc, argv); + // Use background mode + DB::LocalServer * server = bgClickHouseLocal(argc, argv); - auto * conn = new chdb_conn(); - conn->server = server; - conn->connected = true; + auto * conn = new chdb_conn(); + conn->server = server; + conn->connected = true; - return conn; - } - catch (const DB::Exception & e) - { - throw std::domain_error(DB::getExceptionMessage(e, false)); - } - catch (...) - { - throw std::domain_error(DB::getCurrentExceptionMessage(true)); - } + return conn; } From b8f44e3f359c32ece15d7e778f31c7f31d62e90f Mon Sep 17 00:00:00 2001 From: auxten Date: Wed, 13 Nov 2024 13:16:34 +0000 Subject: [PATCH 16/36] If conn_str empty, use :memory: --- programs/local/LocalChdb.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/programs/local/LocalChdb.cpp b/programs/local/LocalChdb.cpp index 2ba5a47c92d..20e8e0c934b 100644 --- a/programs/local/LocalChdb.cpp +++ b/programs/local/LocalChdb.cpp @@ -86,7 +86,7 @@ std::pair> connection_wrapper::p std::string path; std::map params; - if (conn_str == ":memory:") + if (conn_str.empty() || conn_str == ":memory:") { return {":memory:", params}; } From a1a8fdd33091472b885a993b2257eb1a9057b3a7 Mon Sep 17 00:00:00 2001 From: auxten Date: Mon, 18 Nov 2024 09:23:42 +0000 Subject: [PATCH 17/36] Add pyarrow and pandas as dep --- setup.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/setup.py b/setup.py index 93aae87ceef..1bb12c5765b 100644 --- a/setup.py +++ b/setup.py @@ -268,6 +268,10 @@ def build_extensions(self): exclude_package_data={"": ["*.pyc", "src/**"]}, ext_modules=ext_modules, python_requires=">=3.8", + install_requires=[ + "pyarrow>=13.0.0", + "pandas>=2.0.0", + ], cmdclass={"build_ext": BuildExt}, test_suite="tests", zip_safe=False, From af9761e59dfb845030874e03a1ba6aa9df272785 Mon Sep 17 00:00:00 2001 From: auxten Date: Mon, 18 Nov 2024 09:24:29 +0000 Subject: [PATCH 18/36] Fix close_conn --- programs/local/LocalChdb.cpp | 30 +++++---------- programs/local/LocalChdb.h | 4 +- programs/local/LocalServer.cpp | 67 +++++++++++++++++++++++++--------- programs/local/chdb.h | 4 +- 4 files changed, 63 insertions(+), 42 deletions(-) diff --git a/programs/local/LocalChdb.cpp b/programs/local/LocalChdb.cpp index 20e8e0c934b..d7c2922c1c3 100644 --- a/programs/local/LocalChdb.cpp +++ b/programs/local/LocalChdb.cpp @@ -1,4 +1,5 @@ #include "LocalChdb.h" +#include #include "chdb.h" #if USE_PYTHON @@ -212,12 +213,7 @@ connection_wrapper::connection_wrapper(const std::string & conn_str) { auto [path, params] = parse_connection_string(conn_str); - db_path = path; - is_memory_db = (path == ":memory:"); - auto argv = build_clickhouse_args(path, params); - - // Convert to char* array std::vector argv_char; argv_char.reserve(argv.size()); for (auto & arg : argv) @@ -226,16 +222,19 @@ connection_wrapper::connection_wrapper(const std::string & conn_str) } conn = connect_chdb(argv_char.size(), argv_char.data()); + db_path = path; + is_memory_db = (path == ":memory:"); initialize_database(); } connection_wrapper::~connection_wrapper() { - if (conn) - { - close_conn(conn); - conn = nullptr; - } + close_conn(conn); +} + +void connection_wrapper::close() +{ + close_conn(conn); } cursor_wrapper * connection_wrapper::cursor() @@ -248,18 +247,9 @@ void connection_wrapper::commit() // do nothing } -void connection_wrapper::close() -{ - if (conn) - { - close_conn(conn); - conn = nullptr; - } -} - query_result * connection_wrapper::query(const std::string & query_str, const std::string & format) { - return new query_result(query_conn(conn, query_str.c_str(), format.c_str())); + return new query_result(query_conn(*conn, query_str.c_str(), format.c_str())); } void cursor_wrapper::execute(const std::string & query_str) diff --git a/programs/local/LocalChdb.h b/programs/local/LocalChdb.h index 2c99e659314..b30f1dd1b3a 100644 --- a/programs/local/LocalChdb.h +++ b/programs/local/LocalChdb.h @@ -30,7 +30,7 @@ class __attribute__((visibility("default"))) query_result; class connection_wrapper { private: - chdb_conn * conn; + chdb_conn ** conn; std::string db_path; bool is_memory_db; bool is_readonly; @@ -38,7 +38,7 @@ class connection_wrapper public: connection_wrapper(int argc, char ** argv); explicit connection_wrapper(const std::string & conn_str); - chdb_conn * get_conn() { return conn; } + chdb_conn * get_conn() { return *conn; } ~connection_wrapper(); cursor_wrapper * cursor(); void commit(); diff --git a/programs/local/LocalServer.cpp b/programs/local/LocalServer.cpp index 4c65cce108e..27b2014e4f5 100644 --- a/programs/local/LocalServer.cpp +++ b/programs/local/LocalServer.cpp @@ -65,7 +65,9 @@ namespace fs = std::filesystem; - +std::mutex global_connection_mutex; +chdb_conn * global_conn_ptr = nullptr; +std::string global_db_path; namespace CurrentMetrics { extern const Metric MemoryTracking; @@ -1271,39 +1273,68 @@ void free_result_v2(local_result_v2 * result) delete result; } -std::mutex connection_mutex; - -chdb_conn * connect_chdb(int argc, char ** argv) +chdb_conn ** connect_chdb(int argc, char ** argv) { - std::lock_guard lock(connection_mutex); + std::lock_guard lock(global_connection_mutex); - // Use background mode - DB::LocalServer * server = bgClickHouseLocal(argc, argv); + // Check if we already have a connection with this path + std::string path = ":memory:"; // Default path + for (int i = 1; i < argc; i++) + { + if (strncmp(argv[i], "--path=", 7) == 0) + { + path = argv[i] + 7; + break; + } + } + if (global_conn_ptr != nullptr) + { + if (path == global_db_path) + { + // Return existing connection + return &global_conn_ptr; + } + throw DB::Exception( + DB::ErrorCodes::BAD_ARGUMENTS, + "Another connection is already active with different path. Close the existing connection first."); + } + + // Create new connection + DB::LocalServer * server = bgClickHouseLocal(argc, argv); auto * conn = new chdb_conn(); conn->server = server; conn->connected = true; - return conn; -} + // Store globally + global_conn_ptr = conn; + global_db_path = path; + return &global_conn_ptr; +} -void close_conn(chdb_conn * conn) +void close_conn(chdb_conn ** conn) { - if (!conn) - return; + std::lock_guard lock(global_connection_mutex); - std::lock_guard lock(connection_mutex); + if (!conn || !*conn) + return; - if (conn->connected) + if ((*conn)->connected) { - DB::LocalServer * server = static_cast(conn->server); - // Cleanup suggestions before context + DB::LocalServer * server = static_cast((*conn)->server); server->cleanup(); delete server; + + if (*conn == global_conn_ptr) + { + global_conn_ptr = nullptr; + global_db_path.clear(); + } } - delete conn; + delete *conn; + *conn = nullptr; } struct local_result_v2 * query_conn(chdb_conn * conn, const char * query, const char * format) @@ -1313,7 +1344,7 @@ struct local_result_v2 * query_conn(chdb_conn * conn, const char * query, const if (!conn || !conn->connected) return result; - std::lock_guard lock(connection_mutex); + std::lock_guard lock(global_connection_mutex); try { diff --git a/programs/local/chdb.h b/programs/local/chdb.h index 642fa42ccb1..a01b9f77367 100644 --- a/programs/local/chdb.h +++ b/programs/local/chdb.h @@ -57,8 +57,8 @@ struct chdb_conn bool connected; }; -CHDB_EXPORT struct chdb_conn * connect_chdb(int argc, char ** argv); -CHDB_EXPORT void close_conn(struct chdb_conn * conn); +CHDB_EXPORT struct chdb_conn ** connect_chdb(int argc, char ** argv); +CHDB_EXPORT void close_conn(struct chdb_conn ** conn); CHDB_EXPORT struct local_result_v2 * query_conn(struct chdb_conn * conn, const char * query, const char * format); #ifdef __cplusplus From cef9c1b73dc7e59a6f68e0414518c6bfb912396b Mon Sep 17 00:00:00 2001 From: auxten Date: Mon, 18 Nov 2024 09:25:25 +0000 Subject: [PATCH 19/36] Add sqlitelike API for python --- chdb/__init__.py | 3 ++ chdb/state/__init__.py | 3 ++ chdb/state/sqlitelike.py | 93 ++++++++++++++++++++++++++++++++++++++++ chdb/utils/__init__.py | 1 + 4 files changed, 100 insertions(+) create mode 100644 chdb/state/__init__.py create mode 100644 chdb/state/sqlitelike.py diff --git a/chdb/__init__.py b/chdb/__init__.py index 1cab17ec211..2cbd2682a7a 100644 --- a/chdb/__init__.py +++ b/chdb/__init__.py @@ -85,8 +85,10 @@ def query(sql, output_format="CSV", path="", udf_path=""): PyReader = _chdb.PyReader from . import dbapi, session, udf, utils # noqa: E402 +from .state import connect # noqa: E402 __all__ = [ + "_chdb", "PyReader", "ChdbError", "query", @@ -99,4 +101,5 @@ def query(sql, output_format="CSV", path="", udf_path=""): "session", "udf", "utils", + "connect", ] diff --git a/chdb/state/__init__.py b/chdb/state/__init__.py new file mode 100644 index 00000000000..7c8f7d7ea01 --- /dev/null +++ b/chdb/state/__init__.py @@ -0,0 +1,3 @@ +from .sqlitelike import connect + +__all__ = ["connect"] diff --git a/chdb/state/sqlitelike.py b/chdb/state/sqlitelike.py new file mode 100644 index 00000000000..d34f25bbb81 --- /dev/null +++ b/chdb/state/sqlitelike.py @@ -0,0 +1,93 @@ +import io +from typing import Optional, List, Any +from chdb import _chdb + +# try import pyarrow if failed, raise ImportError with suggestion +try: + import pyarrow as pa # noqa +except ImportError as e: + print(f"ImportError: {e}") + print('Please install pyarrow via "pip install pyarrow"') + raise ImportError("Failed to import pyarrow") from None + + +class Connection: + def __init__(self, connection_string: str): + # print("Connection", connection_string) + self._cursor: Optional[Cursor] = None + self._conn = _chdb.connect(connection_string) + + def cursor(self) -> "Cursor": + self._cursor = Cursor(self._conn) + return self._cursor + + def query(self, query: str, format: str = "ArrowStream") -> Any: + return self._conn.query(query, format) + + def close(self) -> None: + # print("close") + if self._cursor: + self._cursor.close() + self._conn.close() + + +class Cursor: + def __init__(self, connection): + self._conn = connection + self._cursor = self._conn.cursor() + self._current_table: Optional[pa.Table] = None + self._current_row: int = 0 + + def execute(self, query: str) -> None: + self._cursor.execute(query) + result_mv = self._cursor.get_memview() + # print("get_result", result_mv) + if self._cursor.has_error(): + raise Exception(self._cursor.get_error()) + if self._cursor.data_size() == 0: + self._current_table = None + self._current_row = 0 + return + arrow_data = result_mv.tobytes() + reader = pa.ipc.open_stream(io.BytesIO(arrow_data)) + self._current_table = reader.read_all() + self._current_row = 0 + + def commit(self) -> None: + self._cursor.commit() + + def fetchone(self) -> Optional[tuple]: + if not self._current_table or self._current_row >= len(self._current_table): + return None + + row_dict = { + col: self._current_table.column(col)[self._current_row].as_py() + for col in self._current_table.column_names + } + self._current_row += 1 + return tuple(row_dict.values()) + + def fetchall(self) -> List[tuple]: + if not self._current_table: + return [] + + remaining_rows = [] + while (row := self.fetchone()) is not None: + remaining_rows.append(row) + return remaining_rows + + def close(self) -> None: + self._cursor.close() + + def __iter__(self): + return self + + def __next__(self) -> tuple: + row = self.fetchone() + if row is None: + raise StopIteration + return row + + +def connect(connection_string: str) -> Connection: + return Connection(connection_string) diff --git a/chdb/utils/__init__.py b/chdb/utils/__init__.py index b0905b0008e..a5cf4fcbbb6 100644 --- a/chdb/utils/__init__.py +++ b/chdb/utils/__init__.py @@ -5,4 +5,5 @@ "convert_to_columnar", "infer_data_type", "infer_data_types", + "trace", ] From 47f78c9a60a859ea6ee9bc498bdd51abac0bb355 Mon Sep 17 00:00:00 2001 From: auxten Date: Mon, 18 Nov 2024 09:26:24 +0000 Subject: [PATCH 20/36] Add test_conn_cursor --- tests/test_conn_cursor.py | 269 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 269 insertions(+) create mode 100644 tests/test_conn_cursor.py diff --git a/tests/test_conn_cursor.py b/tests/test_conn_cursor.py new file mode 100644 index 00000000000..5f83b668d6f --- /dev/null +++ b/tests/test_conn_cursor.py @@ -0,0 +1,269 @@ +import unittest +import os +import shutil +from typing import List, Any, Dict + +from chdb import connect + +enable_print = False +db_path = "test_db_3fdds" + + +class TestCHDB(unittest.TestCase): + def setUp(self): + if os.path.exists(db_path): + shutil.rmtree(db_path) + + def test_basic_operations(self): + conn = connect(":memory:") + cursor = conn.cursor() + # Create a table + cursor.execute( + """ + CREATE TABLE users ( + id Int32, + name String, + scores Array(UInt8) + ) ENGINE = Memory + """ + ) + + # Insert test data + cursor.execute( + """ + INSERT INTO users VALUES + (1, 'Alice', [95, 87, 92]), + (2, 'Bob', [88, 85, 90]), + (3, 'Charlie', [91, 89, 94]) + """ + ) + + # Test fetchone + cursor.execute("SELECT * FROM users WHERE id = 1") + row = cursor.fetchone() + print(row) + self.assertEqual(row[0], 1) + self.assertEqual(row[1], "Alice") + self.assertEqual(row[2], [95, 87, 92]) + + cursor.execute("SELECT * FROM users WHERE id = 2") + row = cursor.fetchone() + print(row) + self.assertEqual(row[0], 2) + self.assertEqual(row[1], "Bob") + self.assertEqual(row[2], [88, 85, 90]) + + row = cursor.fetchone() + self.assertIsNone(row) + + # Test fetchall + print("fetchall") + cursor.execute("SELECT * FROM users ORDER BY id") + rows = cursor.fetchall() + self.assertEqual(len(rows), 3) + self.assertEqual(rows[1][1], "Bob") + + # Test iteration + cursor.execute("SELECT * FROM users ORDER BY id") + rows = [row for row in cursor] + self.assertEqual(len(rows), 3) + self.assertEqual(rows[2][1], "Charlie") + cursor.close() + conn.close() + + def test_connection_management(self): + # Test file-based connection + file_conn = connect(f"file:{db_path}") + self.assertIsNotNone(file_conn) + file_conn.close() + + # Test connection with parameters + readonly_conn = connect(f"file:{db_path}?mode=ro") + self.assertIsNotNone(readonly_conn) + with self.assertRaises(Exception): + cur = readonly_conn.cursor() + cur.execute("CREATE TABLE test (id Int32) ENGINE = Memory") + readonly_conn.close() + + # Test create dir fails + with self.assertRaises(Exception): + # try to create a directory with this test file name + # which will fail surely + connect("test_conn_cursor.py") + + def test_cursor_error_handling(self): + conn = connect(":memory:") + cursor = conn.cursor() + try: + # Test syntax error + with self.assertRaises(Exception): + cursor.execute("INVALID SQL QUERY") + + # Test table not found error + with self.assertRaises(Exception): + cursor.execute("SELECT * FROM nonexistent_table") + finally: + cursor.close() + conn.close() + + def test_transaction_behavior(self): + # Create test table + conn = connect(":memory:") + cursor = conn.cursor() + cursor.execute( + """ + CREATE TABLE test_transactions ( + id Int32, + value String + ) ENGINE = Memory + """ + ) + + # Test basic insert + cursor.execute("INSERT INTO test_transactions VALUES (1, 'test')") + cursor.commit() # Should work even though Memory engine doesn't support transactions + + # Verify data + cursor.execute("SELECT * FROM test_transactions") + row = cursor.fetchone() + self.assertEqual(row, (1, "test")) + + def test_cursor_data_types(self): + conn = connect(":memory:") + cursor = conn.cursor() + # Test various data types + cursor.execute( + """ + CREATE TABLE type_test ( + int_val Int32, + float_val Float64, + string_val String, + array_val Array(Int32), + nullable_val Nullable(String), + date_val Date, + datetime_val DateTime + ) ENGINE = Memory + """ + ) + + cursor.execute( + """ + INSERT INTO type_test VALUES + (42, 3.14, 'hello', [1,2,3], NULL, '2023-01-01', '2023-01-01 12:00:00') + """ + ) + + cursor.execute("SELECT * FROM type_test") + row = cursor.fetchone() + self.assertEqual(row[0], 42) + self.assertAlmostEqual(row[1], 3.14) + self.assertEqual(row[2], "hello") + self.assertEqual(row[3], [1, 2, 3]) + self.assertIsNone(row[4]) + + def test_cursor_multiple_results(self): + conn = connect(":memory:") + cursor = conn.cursor() + # Create test data + cursor.execute( + """ + CREATE TABLE multi_test (id Int32, value String) ENGINE = Memory; + INSERT INTO multi_test VALUES (1, 'one'), (2, 'two'), (3, 'three'); + """ + ) + + # Test partial fetching + cursor.execute("SELECT * FROM multi_test ORDER BY id") + first_row = cursor.fetchone() + self.assertEqual(first_row, (1, "one")) + + remaining_rows = cursor.fetchall() + self.assertEqual(len(remaining_rows), 2) + self.assertEqual(remaining_rows[0], (2, "two")) + + def test_query_formats(self): + conn = connect(":memory:") + cursor = conn.cursor() + # Create test data + cursor.execute( + """ + CREATE TABLE format_test (id Int32, value String) ENGINE = Memory; + INSERT INTO format_test VALUES (1, 'test'); + """ + ) + + # Test different output formats + csv_result = conn.query("SELECT * FROM format_test", format="CSV") + self.assertIsNotNone(csv_result) + + arrow_result = conn.query("SELECT * FROM format_test", format="ArrowStream") + self.assertIsNotNone(arrow_result) + + def test_cursor_statistics(self): + conn = connect(":memory:") + cursor = conn.cursor() + # Create and populate test table + cursor.execute( + """ + CREATE TABLE stats_test (id Int32, value String) ENGINE = Memory; + INSERT INTO stats_test SELECT number, toString(number) + FROM numbers(1000); + """ + ) + + # Execute query and check statistics + cursor.execute("SELECT * FROM stats_test") + self.assertGreater(cursor._cursor.rows_read(), 0) + self.assertGreater(cursor._cursor.bytes_read(), 0) + self.assertGreater(cursor._cursor.elapsed(), 0) + + def test_memory_management(self): + conn = connect(":memory:") + cursor = conn.cursor() + # Test multiple executions + for i in range(10): + cursor = conn.cursor() + cursor.execute("SELECT 1") + self.assertIsNotNone(cursor.fetchone()) + + # Test large result sets + cursor.execute( + """ + SELECT number, toString(number) as str_num + FROM numbers(1000000) + """ + ) + rows = cursor.fetchall() + self.assertEqual(len(rows), 1000000) + + def test_multiple_connections(self): + conn1 = connect(":memory:") + conn2 = connect(":memory:") + cursor1 = conn1.cursor() + cursor2 = conn2.cursor() + + with self.assertRaises(Exception): + connect("file:test.db") + + # Create table in first connection + cursor1.execute( + """ + CREATE TABLE test_table (id Int32, value String) ENGINE = Memory + """ + ) + + # Insert data in second connection + cursor2.execute("INSERT INTO test_table VALUES (1, 'test')") + cursor2.commit() + + # Query data from first connection + cursor1.execute("SELECT * FROM test_table") + row = cursor1.fetchone() + self.assertEqual(row, (1, "test")) + + conn1.close() + conn2.close() + + +if __name__ == "__main__": + unittest.main() From 2365709dfc4a17904155d6acde57cf57bc3fa711 Mon Sep 17 00:00:00 2001 From: auxten Date: Mon, 18 Nov 2024 09:26:53 +0000 Subject: [PATCH 21/36] Add trace utils --- chdb/utils/trace.py | 73 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 73 insertions(+) create mode 100644 chdb/utils/trace.py diff --git a/chdb/utils/trace.py b/chdb/utils/trace.py new file mode 100644 index 00000000000..018387c4fa5 --- /dev/null +++ b/chdb/utils/trace.py @@ -0,0 +1,73 @@ +import functools +import inspect +import sys +import linecache +from datetime import datetime + +enable_print = False + +def print_lines(func): + if not enable_print: + return func + + @functools.wraps(func) + def wrapper(*args, **kwargs): + # Get function name and determine if it's a method + is_method = inspect.ismethod(func) or ( + len(args) > 0 and hasattr(args[0].__class__, func.__name__) + ) + class_name = args[0].__class__.__name__ if is_method else None + + # Get the source code of the function + try: + source_lines, start_line = inspect.getsourcelines(func) + except OSError: + # Handle cases where source might not be available + print(f"Warning: Could not get source for {func.__name__}") + return func(*args, **kwargs) + + def trace(frame, event, arg): + if event == "line": + # Get the current line number and code + line_no = frame.f_lineno + line = linecache.getline(frame.f_code.co_filename, line_no).strip() + + # Don't print decorator lines or empty lines + if line and not line.startswith("@"): + # Get local variables + local_vars = frame.f_locals.copy() + if is_method: + # Remove 'self' from local variables for clarity + local_vars.pop("self", None) + + # Format timestamp + timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3] + + # Create context string (class.method or function) + context = ( + f"{class_name}.{func.__name__}" if class_name else func.__name__ + ) + + # Print execution information + print(f"[{timestamp}] {context} line {line_no}: {line}") + + # Print local variables if they exist and have changed + if local_vars: + vars_str = ", ".join( + f"{k}={repr(v)}" for k, v in local_vars.items() + ) + print(f" Variables: {vars_str}") + return trace + + # Set the trace function + sys.settrace(trace) + + # Call the original function + result = func(*args, **kwargs) + + # Disable tracing + sys.settrace(None) + + return result + + return wrapper From 1399019aacf29df739db7e02b41d55e56b18006c Mon Sep 17 00:00:00 2001 From: auxten Date: Mon, 18 Nov 2024 09:28:05 +0000 Subject: [PATCH 22/36] Add .cursorignore --- .cursorignore | 1 + 1 file changed, 1 insertion(+) create mode 100644 .cursorignore diff --git a/.cursorignore b/.cursorignore new file mode 100644 index 00000000000..ad66b56ef27 --- /dev/null +++ b/.cursorignore @@ -0,0 +1 @@ +contrib/ From ca08ea04165df3298f5b6e3f0faec1b60b01f581 Mon Sep 17 00:00:00 2001 From: auxten Date: Mon, 18 Nov 2024 09:34:37 +0000 Subject: [PATCH 23/36] Fix some pylint issue --- chdb/utils/trace.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/chdb/utils/trace.py b/chdb/utils/trace.py index 018387c4fa5..34a197053f5 100644 --- a/chdb/utils/trace.py +++ b/chdb/utils/trace.py @@ -6,6 +6,7 @@ enable_print = False + def print_lines(func): if not enable_print: return func @@ -16,7 +17,7 @@ def wrapper(*args, **kwargs): is_method = inspect.ismethod(func) or ( len(args) > 0 and hasattr(args[0].__class__, func.__name__) ) - class_name = args[0].__class__.__name__ if is_method else None + class_name = args[0].__class__.__name__ if is_method else None # type: ignore # Get the source code of the function try: From 40e7143955ef6e9675414224fd11b74e740fcab2 Mon Sep 17 00:00:00 2001 From: auxten Date: Thu, 21 Nov 2024 06:04:25 +0000 Subject: [PATCH 24/36] Fix lint --- chdb/utils/trace.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chdb/utils/trace.py b/chdb/utils/trace.py index 34a197053f5..62e61a325cf 100644 --- a/chdb/utils/trace.py +++ b/chdb/utils/trace.py @@ -17,7 +17,7 @@ def wrapper(*args, **kwargs): is_method = inspect.ismethod(func) or ( len(args) > 0 and hasattr(args[0].__class__, func.__name__) ) - class_name = args[0].__class__.__name__ if is_method else None # type: ignore + class_name = args[0].__class__.__name__ if is_method else None # type: ignore # Get the source code of the function try: From ab080f7755cde23c6e8f938d0f5a9d4014c41849 Mon Sep 17 00:00:00 2001 From: auxten Date: Thu, 21 Nov 2024 13:07:27 +0000 Subject: [PATCH 25/36] Add keep_buf switch for local_result_v2 --- programs/local/LocalChdb.cpp | 2 +- programs/local/LocalChdb.h | 22 ++++++++++++++++++++-- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/programs/local/LocalChdb.cpp b/programs/local/LocalChdb.cpp index d7c2922c1c3..1a1ea8ef7fb 100644 --- a/programs/local/LocalChdb.cpp +++ b/programs/local/LocalChdb.cpp @@ -249,7 +249,7 @@ void connection_wrapper::commit() query_result * connection_wrapper::query(const std::string & query_str, const std::string & format) { - return new query_result(query_conn(*conn, query_str.c_str(), format.c_str())); + return new query_result(query_conn(*conn, query_str.c_str(), format.c_str()), true); } void cursor_wrapper::execute(const std::string & query_str) diff --git a/programs/local/LocalChdb.h b/programs/local/LocalChdb.h index b30f1dd1b3a..8a53ab80dfa 100644 --- a/programs/local/LocalChdb.h +++ b/programs/local/LocalChdb.h @@ -55,10 +55,27 @@ class local_result_wrapper { private: local_result_v2 * result; + bool keep_buf; // background server mode will handle buf in ClickHouse engine public: - local_result_wrapper(local_result_v2 * result) : result(result) { } - ~local_result_wrapper() { free_result_v2(result); } + local_result_wrapper(local_result_v2 * result) : result(result), keep_buf(false) { } + local_result_wrapper(local_result_v2 * result, bool keep_buf) : result(result), keep_buf(keep_buf) { } + ~local_result_wrapper() + { + if (keep_buf) + { + if (!result) + return; + + result->_vec = nullptr; + delete[] result->error_message; + delete result; + } + else + { + free_result_v2(result); + } + } char * data() { if (result == nullptr) @@ -141,6 +158,7 @@ class query_result public: query_result(local_result_v2 * result) : result_wrapper(std::make_shared(result)) { } + query_result(local_result_v2 * result, bool keep_buf) : result_wrapper(std::make_shared(result, keep_buf)) { } ~query_result() = default; char * data() { return result_wrapper->data(); } py::bytes bytes() { return result_wrapper->bytes(); } From 233c3b171dd97464852325603f29035e9982ee6d Mon Sep 17 00:00:00 2001 From: auxten Date: Thu, 21 Nov 2024 13:08:14 +0000 Subject: [PATCH 26/36] Use sqlitelike API in DBAPI --- chdb/dbapi/connections.py | 189 ++++++++------------------------------ chdb/dbapi/cursors.py | 118 ++++++------------------ chdb/state/sqlitelike.py | 21 ++++- 3 files changed, 80 insertions(+), 248 deletions(-) diff --git a/chdb/dbapi/connections.py b/chdb/dbapi/connections.py index 090aa5500f9..773f428cba5 100644 --- a/chdb/dbapi/connections.py +++ b/chdb/dbapi/connections.py @@ -1,7 +1,7 @@ -import json from . import err from .cursors import Cursor from . import converters +from ..state import sqlitelike as chdb_stateful DEBUG = False VERBOSE = False @@ -10,56 +10,29 @@ class Connection(object): """ Representation of a connection with chdb. - - The proper way to get an instance of this class is to call - connect(). - - Accepts several arguments: - - :param cursorclass: Custom cursor class to use. - :param path: Optional folder path to store database files on disk. - - See `Connection `_ in the - specification. """ - _closed = False - _session = None - - def __init__(self, cursorclass=Cursor, path=None): - - self._resp = None - - # 1. pre-process params in init - self.encoding = 'utf8' - - self.cursorclass = cursorclass - - self._result = None + def __init__(self, path=None): + self._closed = False + self.encoding = "utf8" self._affected_rows = 0 + self._resp = None - self.connect(path) + # Initialize sqlitelike connection + connection_string = ":memory:" if path is None else f"file:{path}" + self._conn = chdb_stateful.Connection(connection_string) - def connect(self, path=None): - from chdb import session as chs - self._session = chs.Session(path) - self._closed = False - self._execute_command("select 1;") - self._read_query_result() + # Test connection with a simple query + cursor = self._conn.cursor() + cursor.execute("SELECT 1") + cursor.close() def close(self): - """ - Send the quit message and close the socket. - - See `Connection.close() `_ - in the specification. - - :raise Error: If the connection is already closed. - """ + """Send the quit message and close the socket.""" if self._closed: raise err.Error("Already closed") self._closed = True - self._session = None + self._conn.close() @property def open(self): @@ -67,81 +40,41 @@ def open(self): return not self._closed def commit(self): - """ - Commit changes to stable storage. - - See `Connection.commit() `_ - in the specification. - """ - return + """Commit changes to stable storage.""" + # No-op for ClickHouse + pass def rollback(self): - """ - Roll back the current transaction. - - See `Connection.rollback() `_ - in the specification. - """ - return + """Roll back the current transaction.""" + # No-op for ClickHouse + pass def cursor(self, cursor=None): - """ - Create a new cursor to execute queries with. - - :param cursor: The type of cursor to create; current only :py:class:`Cursor` - None means use Cursor. - """ + """Create a new cursor to execute queries with.""" + if self._closed: + raise err.Error("Connection closed") if cursor: - return cursor(self) - return self.cursorclass(self) + return Cursor(self) + return Cursor(self) - # The following methods are INTERNAL USE ONLY (called from Cursor) - def query(self, sql): - if isinstance(sql, str): - sql = sql.encode(self.encoding, 'surrogateescape') - self._execute_command(sql) - self._affected_rows = self._read_query_result() - return self._affected_rows - - def _execute_command(self, sql): - """ - :raise InterfaceError: If the connection is closed. - :raise ValueError: If no username was specified. - """ + def query(self, sql, fmt="ArrowStream"): + """Execute a query and return the raw result.""" if self._closed: raise err.InterfaceError("Connection closed") if isinstance(sql, str): - sql = sql.encode(self.encoding) + sql = sql.encode(self.encoding, "surrogateescape") - if isinstance(sql, bytearray): - sql = bytes(sql) - - # drop last command return - if self._resp is not None: - self._resp = None - - if DEBUG: - print("DEBUG: query:", sql) try: - res = self._session.query(sql, fmt="JSON") - if res.has_error(): - raise err.DatabaseError(res.error_message()) - self._resp = res.data() + result = self._conn.query(sql.decode(), fmt) + self._resp = result + return result except Exception as error: - raise err.InterfaceError("query err: %s" % error) + raise err.InterfaceError(f"Query error: {error}") def escape(self, obj, mapping=None): - """Escape whatever value you pass to it. - - Non-standard, for internal use; do not use this in your applications. - """ - if isinstance(obj, str): - return "'" + self.escape_string(obj) + "'" - if isinstance(obj, (bytes, bytearray)): - ret = self._quote_bytes(obj) - return ret - return converters.escape_item(obj, mapping=mapping) + """Escape whatever value you pass to it.""" + return converters.escape_item(obj, mapping) def escape_string(self, s): return converters.escape_string(s) @@ -149,13 +82,6 @@ def escape_string(self, s): def _quote_bytes(self, s): return converters.escape_bytes(s) - def _read_query_result(self): - self._result = None - result = CHDBResult(self) - result.read() - self._result = result - return result.affected_rows - def __enter__(self): """Context manager that returns a Cursor""" return self.cursor() @@ -166,52 +92,9 @@ def __exit__(self, exc, value, traceback): self.rollback() else: self.commit() + self.close() @property def resp(self): + """Return the last query response""" return self._resp - - -class CHDBResult(object): - def __init__(self, connection): - """ - :type connection: Connection - """ - self.connection = connection - self.affected_rows = 0 - self.insert_id = None - self.warning_count = 0 - self.message = None - self.field_count = 0 - self.description = None - self.rows = None - self.has_next = None - - def read(self): - # Handle empty responses (for instance from CREATE TABLE) - if self.connection.resp is None: - return - - try: - data = json.loads(self.connection.resp) - except Exception as error: - raise err.InterfaceError("Unsupported response format:" % error) - - try: - self.field_count = len(data["meta"]) - description = [] - for meta in data["meta"]: - fields = [meta["name"], meta["type"]] - description.append(tuple(fields)) - self.description = tuple(description) - - rows = [] - for line in data["data"]: - row = [] - for i in range(self.field_count): - column_data = converters.convert_column_data(self.description[i][1], line[self.description[i][0]]) - row.append(column_data) - rows.append(tuple(row)) - self.rows = tuple(rows) - except Exception as error: - raise err.InterfaceError("Read return data err:" % error) diff --git a/chdb/dbapi/cursors.py b/chdb/dbapi/cursors.py index ee9e0fa5e8c..6b16ea29ad8 100644 --- a/chdb/dbapi/cursors.py +++ b/chdb/dbapi/cursors.py @@ -29,13 +29,11 @@ class Cursor(object): def __init__(self, connection): self.connection = connection + self._cursor = connection._conn.cursor() self.description = None self.rowcount = -1 - self.rownumber = 0 self.arraysize = 1 self.lastrowid = None - self._result = None - self._rows = None self._executed = None def __enter__(self): @@ -83,14 +81,7 @@ def close(self): """ Closing a cursor just exhausts all remaining data. """ - conn = self.connection - if conn is None: - return - try: - while self.nextset(): - pass - finally: - self.connection = None + self._cursor.close() def _get_db(self): if not self.connection: @@ -121,33 +112,6 @@ def mogrify(self, query, args=None): return query - def _clear_result(self): - self.rownumber = 0 - self._result = None - - self.rowcount = 0 - self.description = None - self.lastrowid = None - self._rows = None - - def _do_get_result(self): - conn = self._get_db() - - self._result = result = conn._result - - self.rowcount = result.affected_rows - self.description = result.description - self.lastrowid = result.insert_id - self._rows = result.rows - - def _query(self, q): - conn = self._get_db() - self._last_executed = q - self._clear_result() - conn.query(q) - self._do_get_result() - return self.rowcount - def execute(self, query, args=None): """Execute a query @@ -162,14 +126,24 @@ def execute(self, query, args=None): If args is a list or tuple, %s can be used as a placeholder in the query. If args is a dict, %(name)s can be used as a placeholder in the query. """ - while self.nextset(): - pass + if args is not None: + query = query % self._escape_args(args, self.connection) + + self._cursor.execute(query) - query = self.mogrify(query, args) + # Get description from Arrow schema + if self._cursor._current_table is not None: + self.description = [ + (field.name, field.type.to_pandas_dtype(), None, None, None, None, None) + for field in self._cursor._current_table.schema + ] + self.rowcount = self._cursor._current_table.num_rows + else: + self.description = None + self.rowcount = -1 - result = self._query(query) self._executed = query - return result + return self.rowcount def executemany(self, query, args): # type: (str, list) -> int @@ -233,34 +207,21 @@ def _check_executed(self): def fetchone(self): """Fetch the next row""" - self._check_executed() - if self._rows is None or self.rownumber >= len(self._rows): - return None - result = self._rows[self.rownumber] - self.rownumber += 1 - return result - - def fetchmany(self, size=None): + if not self._executed: + raise err.ProgrammingError("execute() first") + return self._cursor.fetchone() + + def fetchmany(self, size=1): """Fetch several rows""" - self._check_executed() - if self._rows is None: - return () - end = self.rownumber + (size or self.arraysize) - result = self._rows[self.rownumber:end] - self.rownumber = min(end, len(self._rows)) - return result + if not self._executed: + raise err.ProgrammingError("execute() first") + return self._cursor.fetchmany(size) def fetchall(self): """Fetch all the rows""" - self._check_executed() - if self._rows is None: - return () - if self.rownumber: - result = self._rows[self.rownumber:] - else: - result = self._rows - self.rownumber = len(self._rows) - return result + if not self._executed: + raise err.ProgrammingError("execute() first") + return self._cursor.fetchall() def nextset(self): """Get the next query set""" @@ -272,26 +233,3 @@ def setinputsizes(self, *args): def setoutputsizes(self, *args): """Does nothing, required by DB API.""" - - -class DictCursor(Cursor): - """A cursor which returns results as a dictionary""" - # You can override this to use OrderedDict or other dict-like types. - dict_type = dict - - def _do_get_result(self): - super()._do_get_result() - fields = [] - if self.description: - for f in self.description: - name = f[0] - fields.append(name) - self._fields = fields - - if fields and self._rows: - self._rows = [self._conv_row(r) for r in self._rows] - - def _conv_row(self, row): - if row is None: - return None - return self.dict_type(zip(self._fields, row)) diff --git a/chdb/state/sqlitelike.py b/chdb/state/sqlitelike.py index d34f25bbb81..d0e2a722e60 100644 --- a/chdb/state/sqlitelike.py +++ b/chdb/state/sqlitelike.py @@ -43,7 +43,7 @@ def execute(self, query: str) -> None: result_mv = self._cursor.get_memview() # print("get_result", result_mv) if self._cursor.has_error(): - raise Exception(self._cursor.get_error()) + raise Exception(self._cursor.error_message()) if self._cursor.data_size() == 0: self._current_table = None self._current_row = 0 @@ -67,14 +67,25 @@ def fetchone(self) -> Optional[tuple]: self._current_row += 1 return tuple(row_dict.values()) - def fetchall(self) -> List[tuple]: + def fetchmany(self, size: int = 1) -> tuple[tuple]: if not self._current_table: - return [] + return tuple() + + rows = [] + for _ in range(size): + if (row := self.fetchone()) is None: + break + rows.append(row) + return tuple(rows) + + def fetchall(self) -> tuple[tuple]: + if not self._current_table: + return tuple() remaining_rows = [] while (row := self.fetchone()) is not None: remaining_rows.append(row) - return remaining_rows + return tuple(remaining_rows) def close(self) -> None: self._cursor.close() @@ -89,5 +100,5 @@ def __next__(self) -> tuple: return row -def connect(connection_string: str) -> Connection: +def connect(connection_string: str = ":memory:") -> Connection: return Connection(connection_string) From 401255b5ebb0445a8faf62f1a3b5a580f792189c Mon Sep 17 00:00:00 2001 From: auxten Date: Thu, 21 Nov 2024 13:08:43 +0000 Subject: [PATCH 27/36] Fix error in example --- examples/dbapi.py | 38 +++++++++++++++++++------------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/examples/dbapi.py b/examples/dbapi.py index 82baa6f6f37..b09ec988b3c 100644 --- a/examples/dbapi.py +++ b/examples/dbapi.py @@ -12,23 +12,23 @@ cur1.close() conn1.close() -conn2 = dbapi.connect(cursorclass=DictCursor) -cur2 = conn2.cursor() -cur2.execute(''' -SELECT - town, - district, - count() AS c, - round(avg(price)) AS price -FROM url('https://datasets-documentation.s3.eu-west-3.amazonaws.com/house_parquet/house_0.parquet') -GROUP BY - town, - district -LIMIT 10 -''') -print("description", cur2.description) -for row in cur2: - print(row) +# conn2 = dbapi.connect(cursorclass=DictCursor) +# cur2 = conn2.cursor() +# cur2.execute(''' +# SELECT +# town, +# district, +# count() AS c, +# round(avg(price)) AS price +# FROM url('https://datasets-documentation.s3.eu-west-3.amazonaws.com/house_parquet/house_0.parquet') +# GROUP BY +# town, +# district +# LIMIT 10 +# ''') +# print("description", cur2.description) +# for row in cur2: +# print(row) -cur2.close() -conn2.close() +# cur2.close() +# conn2.close() From 238e019c3a71fb60a87e3be769180c26e365fc81 Mon Sep 17 00:00:00 2001 From: auxten Date: Thu, 21 Nov 2024 13:09:02 +0000 Subject: [PATCH 28/36] Fix unittest for DBAPI and SQLite like API --- tests/test_conn_cursor.py | 7 +++ tests/test_dbapi.py | 101 ++++++++++++++++++++++---------------- 2 files changed, 65 insertions(+), 43 deletions(-) diff --git a/tests/test_conn_cursor.py b/tests/test_conn_cursor.py index 5f83b668d6f..fe4afda3cc2 100644 --- a/tests/test_conn_cursor.py +++ b/tests/test_conn_cursor.py @@ -14,6 +14,13 @@ def setUp(self): if os.path.exists(db_path): shutil.rmtree(db_path) + def test_conn_query_without_receiving_result(self): + conn = connect() + conn.query("SELECT 1", "CSV") + conn.query("SELECT 1", "Null") + conn.query("SELECT 1", "Null") + conn.close() + def test_basic_operations(self): conn = connect(":memory:") cursor = conn.cursor() diff --git a/tests/test_dbapi.py b/tests/test_dbapi.py index 0ed283e4b9b..eb16f6c7cec 100644 --- a/tests/test_dbapi.py +++ b/tests/test_dbapi.py @@ -1,80 +1,95 @@ #!/usr/bin/env python3 +import os +import shutil +import tempfile import unittest from chdb import dbapi # version should be string split by '.' # eg. '0.12.0' or '0.12.0rc1' or '0.12.0beta1' or '0.12.0alpha1' or '0.12.0a1' -expected_version_pattern = r'^\d+\.\d+\.\d+(.*)?$' -expected_clickhouse_version_pattern = r'^\d+\.\d+\.\d+.\d+$' +expected_version_pattern = r"^\d+\.\d+\.\d+(.*)?$" +expected_clickhouse_version_pattern = r"^\d+\.\d+\.\d+.\d+$" class TestDBAPI(unittest.TestCase): + def test_select_version(self): conn = dbapi.connect() cur = conn.cursor() - cur.execute('select version()') # ClickHouse version - description = cur.description + cur.execute("select version()") # ClickHouse version + # description = cur.description data = cur.fetchone() cur.close() conn.close() # Add your assertions here to validate the description and data - print(description) + # print(description) print(data) self.assertRegex(data[0], expected_clickhouse_version_pattern) def test_insert_and_read_data(self): - conn = dbapi.connect() - cur = conn.cursor() - cur.execute("CREATE DATABASE IF NOT EXISTS test_db ENGINE = Atomic") - cur.execute("USE test_db") - cur.execute(""" - CREATE TABLE rate ( - day Date, - value Int64 - ) ENGINE = ReplacingMergeTree ORDER BY day""") + # make a tmp dir context + with tempfile.TemporaryDirectory() as tmpdirname: + conn = dbapi.connect(tmpdirname) + print(conn) + cur = conn.cursor() + # cur.execute("CREATE DATABASE IF NOT EXISTS test_db ENGINE = Atomic") + # cur.execute("USE test_db") + cur.execute( + """ + CREATE TABLE rate ( + day Date, + value Int64 + ) ENGINE = ReplacingMergeTree ORDER BY day""" + ) - # Insert single value - cur.execute("INSERT INTO rate VALUES (%s, %s)", ("2021-01-01", 24)) - # Insert multiple values - cur.executemany("INSERT INTO rate VALUES (%s, %s)", [("2021-01-02", 128), ("2021-01-03", 256)]) - # Test executemany outside optimized INSERT/REPLACE path - cur.executemany("ALTER TABLE rate UPDATE value = %s WHERE day = %s", [(72, "2021-01-02"), (96, "2021-01-03")]) + # Insert single value + cur.execute("INSERT INTO rate VALUES (%s, %s)", ("2021-01-01", 24)) + # Insert multiple values + cur.executemany( + "INSERT INTO rate VALUES (%s, %s)", + [("2021-01-02", 128), ("2021-01-03", 256)], + ) + # Test executemany outside optimized INSERT/REPLACE path + cur.executemany( + "ALTER TABLE rate UPDATE value = %s WHERE day = %s", + [(72, "2021-01-02"), (96, "2021-01-03")], + ) - # Test fetchone - cur.execute("SELECT value FROM rate ORDER BY day DESC LIMIT 2") - row1 = cur.fetchone() - self.assertEqual(row1, (96,)) - row2 = cur.fetchone() - self.assertEqual(row2, (72,)) - row3 = cur.fetchone() - self.assertIsNone(row3) + # Test fetchone + cur.execute("SELECT value FROM rate ORDER BY day DESC LIMIT 2") + row1 = cur.fetchone() + self.assertEqual(row1, (96,)) + row2 = cur.fetchone() + self.assertEqual(row2, (72,)) + row3 = cur.fetchone() + self.assertIsNone(row3) - # Test fetchmany - cur.execute("SELECT value FROM rate ORDER BY day DESC") - result_set1 = cur.fetchmany(2) - self.assertEqual(result_set1, ((96,), (72,))) - result_set2 = cur.fetchmany(1) - self.assertEqual(result_set2, ((24,),)) + # Test fetchmany + cur.execute("SELECT value FROM rate ORDER BY day DESC") + result_set1 = cur.fetchmany(2) + self.assertEqual(result_set1, ((96,), (72,))) + result_set2 = cur.fetchmany(1) + self.assertEqual(result_set2, ((24,),)) - # Test fetchall - cur.execute("SELECT value FROM rate ORDER BY day DESC") - rows = cur.fetchall() - self.assertEqual(rows, ((96,), (72,), (24,))) + # Test fetchall + cur.execute("SELECT value FROM rate ORDER BY day DESC") + rows = cur.fetchall() + self.assertEqual(rows, ((96,), (72,), (24,))) - # Clean up - cur.close() - conn.close() + # Clean up + cur.close() + conn.close() def test_select_chdb_version(self): ver = dbapi.get_client_info() # chDB version liek '0.12.0' ver_tuple = dbapi.chdb_version # chDB version tuple like ('0', '12', '0') print(ver) print(ver_tuple) - self.assertEqual(ver, '.'.join(ver_tuple)) + self.assertEqual(ver, ".".join(ver_tuple)) self.assertRegex(ver, expected_version_pattern) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() From a82d02856a8ce49bae1ae9fd24ffdc267665e59f Mon Sep 17 00:00:00 2001 From: auxten Date: Thu, 21 Nov 2024 13:11:25 +0000 Subject: [PATCH 29/36] Fix lint --- chdb/state/sqlitelike.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chdb/state/sqlitelike.py b/chdb/state/sqlitelike.py index d0e2a722e60..b02e5ff1b05 100644 --- a/chdb/state/sqlitelike.py +++ b/chdb/state/sqlitelike.py @@ -1,5 +1,5 @@ import io -from typing import Optional, List, Any +from typing import Optional, Any from chdb import _chdb # try import pyarrow if failed, raise ImportError with suggestion From 0f1ae8cb319156b1c9dab517a0ac5c21179c0619 Mon Sep 17 00:00:00 2001 From: auxten Date: Fri, 22 Nov 2024 04:50:22 +0000 Subject: [PATCH 30/36] Fix Python 3.8 --- chdb/state/sqlitelike.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/chdb/state/sqlitelike.py b/chdb/state/sqlitelike.py index b02e5ff1b05..479a122627f 100644 --- a/chdb/state/sqlitelike.py +++ b/chdb/state/sqlitelike.py @@ -67,7 +67,7 @@ def fetchone(self) -> Optional[tuple]: self._current_row += 1 return tuple(row_dict.values()) - def fetchmany(self, size: int = 1) -> tuple[tuple]: + def fetchmany(self, size: int = 1) -> tuple: if not self._current_table: return tuple() @@ -78,7 +78,7 @@ def fetchmany(self, size: int = 1) -> tuple[tuple]: rows.append(row) return tuple(rows) - def fetchall(self) -> tuple[tuple]: + def fetchall(self) -> tuple: if not self._current_table: return tuple() From ec37ebe394ef7f47898edefa919aeaad429fd459 Mon Sep 17 00:00:00 2001 From: auxten Date: Fri, 22 Nov 2024 13:04:54 +0000 Subject: [PATCH 31/36] Handle parameters without values --- programs/local/LocalChdb.cpp | 19 ++++++++++++++----- programs/local/LocalChdb.h | 1 - programs/local/LocalServer.cpp | 26 +++++++++++++++++++------- 3 files changed, 33 insertions(+), 13 deletions(-) diff --git a/programs/local/LocalChdb.cpp b/programs/local/LocalChdb.cpp index 1a1ea8ef7fb..606ad070d76 100644 --- a/programs/local/LocalChdb.cpp +++ b/programs/local/LocalChdb.cpp @@ -125,6 +125,11 @@ std::pair> connection_wrapper::p std::string value = param.substr(eq_pos + 1); params[key] = value; } + else if (!param.empty()) + { + // Handle parameters without values + params[param] = ""; + } } } else @@ -167,6 +172,15 @@ connection_wrapper::build_clickhouse_args(const std::string & path, const std::m argv.push_back("--readonly=1"); } } + else if (value.empty()) + { + // Handle parameters without values (like ?withoutarg) + argv.push_back("--" + key); + } + else + { + argv.push_back("--" + key + "=" + value); + } } return argv; @@ -204,11 +218,6 @@ void connection_wrapper::initialize_database() } } -connection_wrapper::connection_wrapper(int argc, char ** argv) -{ - conn = connect_chdb(argc, argv); -} - connection_wrapper::connection_wrapper(const std::string & conn_str) { auto [path, params] = parse_connection_string(conn_str); diff --git a/programs/local/LocalChdb.h b/programs/local/LocalChdb.h index 8a53ab80dfa..3193d4893e7 100644 --- a/programs/local/LocalChdb.h +++ b/programs/local/LocalChdb.h @@ -36,7 +36,6 @@ class connection_wrapper bool is_readonly; public: - connection_wrapper(int argc, char ** argv); explicit connection_wrapper(const std::string & conn_str); chdb_conn * get_conn() { return *conn; } ~connection_wrapper(); diff --git a/programs/local/LocalServer.cpp b/programs/local/LocalServer.cpp index 27b2014e4f5..6ea1ed4f447 100644 --- a/programs/local/LocalServer.cpp +++ b/programs/local/LocalServer.cpp @@ -1158,9 +1158,10 @@ std::unique_ptr pyEntryClickHouseLocal(int argc, char ** argv) DB::LocalServer * bgClickHouseLocal(int argc, char ** argv) { + DB::LocalServer * app = nullptr; try { - auto * app = new DB::LocalServer(); + app = new DB::LocalServer(); app->setBackground(true); app->init(argc, argv); int ret = app->run(); @@ -1175,10 +1176,22 @@ DB::LocalServer * bgClickHouseLocal(int argc, char ** argv) } catch (const DB::Exception & e) { + delete app; throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "bgClickHouseLocal {}", DB::getExceptionMessage(e, false)); } + catch (const Poco::Exception & e) + { + delete app; + throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "bgClickHouseLocal {}", e.displayText()); + } + catch (const std::exception & e) + { + delete app; + throw std::domain_error(e.what()); + } catch (...) { + delete app; throw std::domain_error(DB::getCurrentExceptionMessage(true)); } } @@ -1360,13 +1373,12 @@ struct local_result_v2 * query_conn(chdb_conn * conn, const char * query, const } // Get query results without copying - auto * output_vec = server->getQueryOutputVector(); - if (output_vec && !output_vec->empty()) + auto output_span = server->getQueryOutputSpan(); + if (!output_span.empty()) { - // Take ownership of the vector - result->_vec = std::move(output_vec); - result->buf = output_vec->data(); - result->len = output_vec->size(); + result->_vec = nullptr; + result->buf = output_span.data(); + result->len = output_span.size(); } result->rows_read = server->getProcessedRows(); From 4a61bbe3329e634f3593a6b022ed618115edc695 Mon Sep 17 00:00:00 2001 From: auxten Date: Fri, 22 Nov 2024 13:05:24 +0000 Subject: [PATCH 32/36] Add getQueryOutputSpan --- src/Client/ClientBase.h | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/Client/ClientBase.h b/src/Client/ClientBase.h index d1bd1d6388a..464646f2c87 100644 --- a/src/Client/ClientBase.h +++ b/src/Client/ClientBase.h @@ -104,6 +104,13 @@ class ClientBase // std::vector vec(buf.begin(), buf.end()); return query_result_memory; } + + std::span getQueryOutputSpan() + { + auto size = query_result_buf->count(); + return std::span(query_result_memory->begin(), size); + } + size_t getProcessedRows() const { return processed_rows; } size_t getProcessedBytes() const { return processed_bytes; } double getElapsedTime() const { return progress_indication.elapsedSeconds(); } From 7b293dd0b7adbe3e9adbf1411a4c78f676c881e3 Mon Sep 17 00:00:00 2001 From: auxten Date: Fri, 22 Nov 2024 13:24:06 +0000 Subject: [PATCH 33/36] Add doc string for chdb.connect --- chdb/state/sqlitelike.py | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/chdb/state/sqlitelike.py b/chdb/state/sqlitelike.py index 479a122627f..b99eb5e868d 100644 --- a/chdb/state/sqlitelike.py +++ b/chdb/state/sqlitelike.py @@ -101,4 +101,32 @@ def __next__(self) -> tuple: def connect(connection_string: str = ":memory:") -> Connection: + """ + Create a connection to chDB backgroud server. + Only one open connection is allowed per process. Use `close` to close the connection. + If called with the same connection string, the same connection object will be returned. + You can use the connection object to create cursor object. `cursor` method will return a cursor object. + + Args: + connection_string (str, optional): Connection string. Defaults to ":memory:". + Aslo support file path like: + - ":memory:" (for in-memory database) + - "test.db" (for relative path) + - "file:test.db" (same as above) + - "/path/to/test.db" (for absolute path) + - "file:/path/to/test.db" (same as above) + - "file:test.db?param1=value1¶m2=value2" (for relative path with query params) + - "///path/to/test.db?param1=value1¶m2=value2" (for absolute path) + + Connection string args handling: + Connection string can contain query params like "file:test.db?param1=value1¶m2=value2" + "param1=value1" will be passed to ClickHouse engine as start up args. + + For more details, see `clickhouse local --help --verbose` + Some special args handling: + - "mode=ro" would be "--readonly=1" for clickhouse (read-only mode) + + Returns: + Connection: Connection object + """ return Connection(connection_string) From da541d169725654a2817d9df068c89cb709ad061 Mon Sep 17 00:00:00 2001 From: auxten Date: Fri, 22 Nov 2024 13:24:27 +0000 Subject: [PATCH 34/36] Fix double free --- programs/local/LocalServer.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/programs/local/LocalServer.cpp b/programs/local/LocalServer.cpp index 6ea1ed4f447..ff31e18cc3e 100644 --- a/programs/local/LocalServer.cpp +++ b/programs/local/LocalServer.cpp @@ -1170,6 +1170,7 @@ DB::LocalServer * bgClickHouseLocal(int argc, char ** argv) auto err_msg = app->getErrorMsg(); LOG_ERROR(&app->logger(), "Error running bgClickHouseLocal: {}", err_msg); delete app; + app = nullptr; throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Error running bgClickHouseLocal: {}", err_msg); } return app; From b5945e14a28807adaecdb82d4c9107bb671a72e6 Mon Sep 17 00:00:00 2001 From: auxten Date: Fri, 22 Nov 2024 13:24:43 +0000 Subject: [PATCH 35/36] Fix null check --- src/Client/ClientBase.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/Client/ClientBase.h b/src/Client/ClientBase.h index 464646f2c87..adbfd7f60b5 100644 --- a/src/Client/ClientBase.h +++ b/src/Client/ClientBase.h @@ -107,6 +107,8 @@ class ClientBase std::span getQueryOutputSpan() { + if (!query_result_memory || !query_result_buf) + return {}; auto size = query_result_buf->count(); return std::span(query_result_memory->begin(), size); } From 7c3cdde7b332bdc8fb576e9264d4a683dbb6e89b Mon Sep 17 00:00:00 2001 From: auxten Date: Fri, 22 Nov 2024 13:40:38 +0000 Subject: [PATCH 36/36] Test connect properties --- tests/test_conn_cursor.py | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/tests/test_conn_cursor.py b/tests/test_conn_cursor.py index fe4afda3cc2..adf40108568 100644 --- a/tests/test_conn_cursor.py +++ b/tests/test_conn_cursor.py @@ -5,7 +5,6 @@ from chdb import connect -enable_print = False db_path = "test_db_3fdds" @@ -271,6 +270,23 @@ def test_multiple_connections(self): conn1.close() conn2.close() + def test_connection_properties(self): + # conn = connect("{db_path}?log_queries=1&verbose=1&log-level=test") + with self.assertRaises(Exception): + conn = connect(f"{db_path}?not_exist_flag=1") + with self.assertRaises(Exception): + conn = connect(f"{db_path}?verbose=1") + + conn = connect(f"{db_path}?verbose&log-level=test") + ret = conn.query("SELECT 123", "CSV") + print(ret) + print(len(ret)) + self.assertEqual(str(ret), "123\n") + ret = conn.query("show tables in system", "CSV") + self.assertGreater(len(ret), 10) + + conn.close() + if __name__ == "__main__": unittest.main()