From a18c82f9e20c7243e114cf0802464a2c077db7ca Mon Sep 17 00:00:00 2001 From: Bruce Ritchie Date: Fri, 15 Nov 2024 23:57:41 +0000 Subject: [PATCH 1/7] Fix md5 return_type to only return Utf8 as per current code impl. --- datafusion/functions/src/crypto/md5.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/functions/src/crypto/md5.rs b/datafusion/functions/src/crypto/md5.rs index 0f18fd47b4cf..0e8ff1cd3192 100644 --- a/datafusion/functions/src/crypto/md5.rs +++ b/datafusion/functions/src/crypto/md5.rs @@ -64,11 +64,11 @@ impl ScalarUDFImpl for Md5Func { fn return_type(&self, arg_types: &[DataType]) -> Result { use DataType::*; Ok(match &arg_types[0] { - LargeUtf8 | LargeBinary => LargeUtf8, + LargeUtf8 | LargeBinary => Utf8, Utf8View | Utf8 | Binary => Utf8, Null => Null, Dictionary(_, t) => match **t { - LargeUtf8 | LargeBinary => LargeUtf8, + LargeUtf8 | LargeBinary => Utf8, Utf8 | Binary => Utf8, Null => Null, _ => { From 2b1af718b3a3382cf5a7cedadb163531e20c48ff Mon Sep 17 00:00:00 2001 From: Bruce Ritchie Date: Sat, 28 Dec 2024 22:30:31 +0000 Subject: [PATCH 2/7] Add support for sqlite test files to sqllogictest --- .gitmodules | 4 + datafusion-testing | 1 + datafusion/sqllogictest/Cargo.toml | 9 +- datafusion/sqllogictest/README.md | 44 +- datafusion/sqllogictest/bin/sqllogictests.rs | 522 +++++++++++++++--- .../engines/datafusion_engine/normalize.rs | 4 + .../src/engines/datafusion_engine/runner.rs | 65 ++- .../src/engines/postgres_engine/mod.rs | 63 ++- 8 files changed, 613 insertions(+), 99 deletions(-) create mode 100755 datafusion-testing diff --git a/.gitmodules b/.gitmodules index ec5d6208b8dd..037accdbe424 100644 --- a/.gitmodules +++ b/.gitmodules @@ -4,3 +4,7 @@ [submodule "testing"] path = testing url = https://github.com/apache/arrow-testing +[submodule "datafusion-testing"] + path = datafusion-testing + url = https://github.com/apache/datafusion-testing.git + branch = main diff --git a/datafusion-testing b/datafusion-testing new file mode 100755 index 000000000000..8ec2a3ebcfea --- /dev/null +++ b/datafusion-testing @@ -0,0 +1 @@ +e2e320c9477a6d8ab09662eae255887733c0e304 \ No newline at end of file diff --git a/datafusion/sqllogictest/Cargo.toml b/datafusion/sqllogictest/Cargo.toml index 0d648941e81e..88afdfceb412 100644 --- a/datafusion/sqllogictest/Cargo.toml +++ b/datafusion/sqllogictest/Cargo.toml @@ -45,15 +45,19 @@ datafusion-common = { workspace = true, default-features = true } datafusion-common-runtime = { workspace = true, default-features = true } futures = { workspace = true } half = { workspace = true, default-features = true } +indicatif = "0.17" itertools = { workspace = true } log = { workspace = true } object_store = { workspace = true } +once_cell = { version = "1.20", optional = true } postgres-protocol = { version = "0.6.7", optional = true } postgres-types = { version = "0.2.8", features = ["derive", "with-chrono-0_4"], optional = true } rust_decimal = { version = "1.36.0", features = ["tokio-pg"] } sqllogictest = "0.25.0" sqlparser = { workspace = true } tempfile = { workspace = true } +testcontainers = { version = "0.23", features = ["default"], optional = true } +testcontainers-modules = { version = "0.11", features = ["postgres"], optional = true } thiserror = "2.0.0" tokio = { workspace = true } tokio-postgres = { version = "0.7.12", optional = true } @@ -63,9 +67,12 @@ avro = ["datafusion/avro"] postgres = [ "bytes", "chrono", - "tokio-postgres", + "once_cell", "postgres-types", "postgres-protocol", + "testcontainers", + "testcontainers-modules", + "tokio-postgres", ] [dev-dependencies] diff --git a/datafusion/sqllogictest/README.md b/datafusion/sqllogictest/README.md index 885e92fee270..124735c89d87 100644 --- a/datafusion/sqllogictest/README.md +++ b/datafusion/sqllogictest/README.md @@ -28,7 +28,8 @@ This crate is a submodule of DataFusion that contains an implementation of [sqll ## Overview This crate uses [sqllogictest-rs](https://github.com/risinglightdb/sqllogictest-rs) to parse and run `.slt` files in the -[`test_files`](test_files) directory of this crate. +[`test_files`](test_files) directory of this crate or the [`data/sqlite`](sqlite) +directory of the datafusion-testing crate. ## Testing setup @@ -160,7 +161,7 @@ cargo test --test sqllogictests -- information Test files that start with prefix `pg_compat_` verify compatibility with Postgres by running the same script files both with DataFusion and with Postgres -In order to run the sqllogictests running against a previously running Postgres instance, do: +In order to have the sqllogictest run against an existing running Postgres instance, do: ```shell PG_COMPAT=true PG_URI="postgresql://postgres@127.0.0.1/postgres" cargo test --features=postgres --test sqllogictests @@ -172,7 +173,7 @@ The environment variables: 2. `PG_URI` contains a `libpq` style connection string, whose format is described in [the docs](https://docs.rs/tokio-postgres/latest/tokio_postgres/config/struct.Config.html#url) -One way to create a suitable a posgres container in docker is to use +One way to create a suitable a postgres container in docker is to use the [Official Image](https://hub.docker.com/_/postgres) with a command such as the following. Note the collation **must** be set to `C` otherwise `ORDER BY` will not match DataFusion and the tests will diff. @@ -185,6 +186,15 @@ docker run \ postgres ``` +If you do not want to create a new postgres database and you have docker +installed you can skip providing a PG_URI env variable and the sqllogictest +runner will automatically create a temporary postgres docker container. +For example: + +```shell +PG_COMPAT=true cargo test --features=postgres --test sqllogictests +``` + ## Running Tests: `tpch` Test files in `tpch` directory runs against the `TPCH` data set (SF = @@ -205,6 +215,34 @@ Then you need to add `INCLUDE_TPCH=true` to run tpch tests: INCLUDE_TPCH=true cargo test --test sqllogictests ``` +## Running Tests: `sqlite` + +Test files in `data/sqlite` directory of the datafusion-testing crate were +sourced from the sqlite test suite and have been cleansed and updated to +run within DataFusion's sqllogictest runner. + +To run the sqlite tests you need to increase the rust stack size and add +`INCLUDE_SQLITE=true` to run the sqlite tests: + +```shell +export RUST_MIN_STACK=30485760; +INCLUDE_SQLITE=true cargo test --test sqllogictests +``` + +Note that there are well over 5 million queries in these tests and running the +sqlite tests will take a long time. You may wish to run them in release-nonlto mode: + +```shell +INCLUDE_SQLITE=true cargo test --profile release-nonlto --test sqllogictests +``` + +The sqlite tests can also be run with the postgres runner to verify compatibility: + +```shell +export RUST_MIN_STACK=30485760; +PG_COMPAT=true INCLUDE_SQLITE=true cargo test --features=postgres --test sqllogictests +``` + ## Updating tests: Completion Mode In test script completion mode, `sqllogictests` reads a prototype script and runs the statements and queries against the diff --git a/datafusion/sqllogictest/bin/sqllogictests.rs b/datafusion/sqllogictest/bin/sqllogictests.rs index 066cc8ee9824..498539c1674a 100644 --- a/datafusion/sqllogictest/bin/sqllogictests.rs +++ b/datafusion/sqllogictest/bin/sqllogictests.rs @@ -16,57 +16,129 @@ // under the License. use clap::Parser; +use datafusion_common::instant::Instant; use datafusion_common::utils::get_available_parallelism; +use datafusion_common::{exec_datafusion_err, exec_err, DataFusionError, Result}; +use datafusion_common_runtime::SpawnedTask; use datafusion_sqllogictest::{DataFusion, TestContext}; use futures::stream::StreamExt; +use indicatif::{ + HumanDuration, MultiProgress, ProgressBar, ProgressDrawTarget, ProgressStyle, +}; use itertools::Itertools; -use log::info; -use sqllogictest::{strict_column_validator, Normalizer}; +use log::Level::{Info, Warn}; +use log::{info, log_enabled, warn}; +#[cfg(feature = "postgres")] +use once_cell::sync::Lazy; +use sqllogictest::{ + parse_file, strict_column_validator, AsyncDB, Condition, Normalizer, Record, + Validator, +}; +#[cfg(feature = "postgres")] +use std::env::set_var; use std::ffi::OsStr; use std::fs; +#[cfg(feature = "postgres")] +use std::future::Future; use std::path::{Path, PathBuf}; - -use datafusion_common::{exec_datafusion_err, exec_err, DataFusionError, Result}; -use datafusion_common_runtime::SpawnedTask; +#[cfg(feature = "postgres")] +use std::{env, thread}; +#[cfg(feature = "postgres")] +use testcontainers::core::IntoContainerPort; +#[cfg(feature = "postgres")] +use testcontainers::runners::AsyncRunner; +#[cfg(feature = "postgres")] +use testcontainers::ImageExt; +#[cfg(feature = "postgres")] +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; +#[cfg(feature = "postgres")] +use tokio::sync::{mpsc, Mutex}; +#[cfg(feature = "postgres")] +use ContainerCommands::{FetchHost, FetchPort}; const TEST_DIRECTORY: &str = "test_files/"; +const DATAFUSION_TESTING_TEST_DIRECTORY: &str = "../../datafusion-testing/data/"; const PG_COMPAT_FILE_PREFIX: &str = "pg_compat_"; +const SQLITE_PREFIX: &str = "sqlite"; pub fn main() -> Result<()> { tokio::runtime::Builder::new_multi_thread() .enable_all() - .build() - .unwrap() + .build()? .block_on(run_tests()) } +// Trailing whitespace from lines in SLT will typically be removed, but do not fail if it is not +// If particular test wants to cover trailing whitespace on a value, +// it should project additional non-whitespace column on the right. #[allow(clippy::ptr_arg)] -fn normalizer(s: &String) -> String { - // Trailing whitespace from lines in SLT will typically be removed, but do not fail if it is not - // If particular test wants to cover trailing whitespace on a value, - // it should project additional non-whitespace column on the right. - s.trim_end().to_owned() +fn value_normalizer(s: &String) -> String { + s.trim_end().to_string() } -fn value_validator( +fn sqlite_value_validator( normalizer: Normalizer, actual: &[Vec], expected: &[String], ) -> bool { - let expected = expected.iter().map(normalizer).collect::>(); - let actual = actual + let normalized_expected = expected.iter().map(normalizer).collect::>(); + let normalized_actual = actual + .iter() + .map(|strs| strs.iter().map(normalizer).join(" ")) + .collect_vec(); + + if log_enabled!(Info) && normalized_actual != normalized_expected { + info!("sqlite validation failed. actual vs expected:"); + for i in 0..normalized_actual.len() { + info!("[{i}] {}", normalized_actual[i]); + info!( + "[{i}] {}", + if normalized_expected.len() >= i { + &normalized_expected[i] + } else { + "No more results" + } + ); + } + } + + normalized_actual == normalized_expected +} + +fn df_value_validator( + normalizer: Normalizer, + actual: &[Vec], + expected: &[String], +) -> bool { + let normalized_expected = expected.iter().map(normalizer).collect::>(); + let normalized_actual = actual .iter() .map(|strs| strs.iter().join(" ")) - // Editors do not preserve trailing whitespace, so expected may or may not lack it included - .map(|str| normalizer(&str)) - .collect::>(); - actual == expected + .map(|str| str.trim_end().to_string()) + .collect_vec(); + + if log_enabled!(Warn) && normalized_actual != normalized_expected { + warn!("df validation failed. actual vs expected:"); + for i in 0..normalized_actual.len() { + warn!("[{i}] {}", normalized_actual[i]); + warn!( + "[{i}] {}", + if normalized_expected.len() >= i { + &normalized_expected[i] + } else { + "No more results" + } + ); + } + } + + normalized_actual == normalized_expected } /// Sets up an empty directory at test_files/scratch/ /// creating it if needed and clearing any file contents if it exists /// This allows tests for inserting to external tables or copy to -/// to persist data to disk and have consistent state when running +/// persist data to disk and have consistent state when running /// a new test fn setup_scratch_dir(name: &Path) -> Result<()> { // go from copy.slt --> copy @@ -97,23 +169,89 @@ async fn run_tests() -> Result<()> { } options.warn_on_ignored(); + #[cfg(feature = "postgres")] + let start_pg_database = options.postgres_runner && !is_pg_uri_set(); + #[cfg(feature = "postgres")] + if start_pg_database { + info!("Starting postgres db ..."); + + thread::spawn(|| { + execute_blocking(start_postgres( + &POSTGRES_IN, + &POSTGRES_HOST, + &POSTGRES_PORT, + &POSTGRES_STOPPED, + )) + }); + + POSTGRES_IN.tx.send(FetchHost).unwrap(); + let db_host = POSTGRES_HOST.rx.lock().await.recv().await.unwrap(); + + POSTGRES_IN.tx.send(FetchPort).unwrap(); + let db_port = POSTGRES_PORT.rx.lock().await.recv().await.unwrap(); + + let pg_uri = format!("postgresql://postgres:postgres@{db_host}:{db_port}/test"); + info!("Postgres uri is {pg_uri}"); + + set_var("PG_URI", pg_uri); + } + // Run all tests in parallel, reporting failures at the end // // Doing so is safe because each slt file runs with its own // `SessionContext` and should not have side effects (like // modifying shared state like `/tmp/`) + let m = MultiProgress::with_draw_target(ProgressDrawTarget::stderr_with_hz(1)); + let m_style = ProgressStyle::with_template( + "[{elapsed_precise}] {bar:40.cyan/blue} {pos:>7}/{len:7} {msg}", + ) + .unwrap() + .progress_chars("##-"); + + let start = Instant::now(); + let errors: Vec<_> = futures::stream::iter(read_test_files(&options)?) .map(|test_file| { + let validator = if options.include_sqlite + && test_file.relative_path.starts_with(SQLITE_PREFIX) + { + sqlite_value_validator + } else { + df_value_validator + }; + + let m_clone = m.clone(); + let m_style_clone = m_style.clone(); + SpawnedTask::spawn(async move { - let file_path = test_file.relative_path.clone(); - let start = datafusion::common::instant::Instant::now(); match (options.postgres_runner, options.complete) { - (false, false) => run_test_file(test_file).await?, - (false, true) => run_complete_file(test_file).await?, - (true, false) => run_test_file_with_postgres(test_file).await?, - (true, true) => run_complete_file_with_postgres(test_file).await?, + (false, false) => { + run_test_file(test_file, validator, m_clone, m_style_clone) + .await? + } + (false, true) => { + run_complete_file(test_file, validator, m_clone, m_style_clone) + .await? + } + (true, false) => { + run_test_file_with_postgres( + test_file, + validator, + m_clone, + m_style_clone, + ) + .await? + } + (true, true) => { + run_complete_file_with_postgres( + test_file, + validator, + m_clone, + m_style_clone, + ) + .await? + } } - println!("Executed {:?}. Took {:?}", file_path, start.elapsed()); Ok(()) as Result<()> }) .join() @@ -136,6 +274,15 @@ async fn run_tests() -> Result<()> { .collect() .await; + m.println(format!("Completed in {}", HumanDuration(start.elapsed())))?; + + #[cfg(feature = "postgres")] + if start_pg_database { + println!("Stopping postgres db ..."); + POSTGRES_IN.tx.send(ContainerCommands::Stop).unwrap_or(()); + POSTGRES_STOPPED.rx.lock().await.recv().await; + } + // report on any errors if !errors.is_empty() { for e in &errors { @@ -147,60 +294,148 @@ async fn run_tests() -> Result<()> { } } -async fn run_test_file(test_file: TestFile) -> Result<()> { +#[cfg(feature = "postgres")] +fn is_pg_uri_set() -> bool { + match env::var("PG_URI") { + Ok(_) => true, + Err(_) => false, + } +} + +async fn run_test_file( + test_file: TestFile, + validator: Validator, + mp: MultiProgress, + mp_style: ProgressStyle, +) -> Result<()> { let TestFile { path, relative_path, } = test_file; - info!("Running with DataFusion runner: {}", path.display()); let Some(test_ctx) = TestContext::try_new_for_test_file(&relative_path).await else { info!("Skipping: {}", path.display()); return Ok(()); }; setup_scratch_dir(&relative_path)?; + + let count: u64 = get_record_count(&path, "Datafusion".to_string()); + let pb = mp.add(ProgressBar::new(count)); + + pb.set_style(mp_style); + pb.set_message(format!("{:?}", &relative_path)); + let mut runner = sqllogictest::Runner::new(|| async { Ok(DataFusion::new( test_ctx.session_ctx().clone(), relative_path.clone(), + pb.clone(), )) }); + runner.add_label("Datafusion"); runner.with_column_validator(strict_column_validator); - runner.with_normalizer(normalizer); - runner.with_validator(value_validator); - runner + runner.with_normalizer(value_normalizer); + runner.with_validator(validator); + + let res = runner .run_file_async(path) .await - .map_err(|e| DataFusionError::External(Box::new(e))) + .map_err(|e| DataFusionError::External(Box::new(e))); + + pb.finish_and_clear(); + + res +} + +fn get_record_count(path: &PathBuf, label: String) -> u64 { + let records: Vec::ColumnType>> = + parse_file(path).unwrap(); + let mut count: u64 = 0; + + records.iter().for_each(|rec| match rec { + Record::Query { conditions, .. } => { + if conditions.is_empty() + || !conditions.contains(&Condition::SkipIf { + label: label.clone(), + }) + || conditions.contains(&Condition::OnlyIf { + label: label.clone(), + }) + { + count += 1; + } + } + Record::Statement { conditions, .. } => { + if conditions.is_empty() + || !conditions.contains(&Condition::SkipIf { + label: label.clone(), + }) + || conditions.contains(&Condition::OnlyIf { + label: label.clone(), + }) + { + count += 1; + } + } + _ => {} + }); + + count } #[cfg(feature = "postgres")] -async fn run_test_file_with_postgres(test_file: TestFile) -> Result<()> { +async fn run_test_file_with_postgres( + test_file: TestFile, + validator: Validator, + mp: MultiProgress, + mp_style: ProgressStyle, +) -> Result<()> { use datafusion_sqllogictest::Postgres; let TestFile { path, relative_path, } = test_file; - info!("Running with Postgres runner: {}", path.display()); setup_scratch_dir(&relative_path)?; - let mut runner = - sqllogictest::Runner::new(|| Postgres::connect(relative_path.clone())); + + let count: u64 = get_record_count(&path, "postgresql".to_string()); + let pb = mp.add(ProgressBar::new(count)); + + pb.set_style(mp_style); + pb.set_message(format!("{:?}", &relative_path)); + + let mut runner = sqllogictest::Runner::new(|| { + Postgres::connect(relative_path.clone(), pb.clone()) + }); + runner.add_label("postgres"); runner.with_column_validator(strict_column_validator); - runner.with_normalizer(normalizer); - runner.with_validator(value_validator); + runner.with_normalizer(value_normalizer); + runner.with_validator(validator); runner .run_file_async(path) .await .map_err(|e| DataFusionError::External(Box::new(e)))?; + + pb.finish_and_clear(); + Ok(()) } #[cfg(not(feature = "postgres"))] -async fn run_test_file_with_postgres(_test_file: TestFile) -> Result<()> { +async fn run_test_file_with_postgres( + _test_file: TestFile, + _validator: Validator, + _mp: MultiProgress, + _mp_style: ProgressStyle, +) -> Result<()> { use datafusion_common::plan_err; plan_err!("Can not run with postgres as postgres feature is not enabled") } -async fn run_complete_file(test_file: TestFile) -> Result<()> { +async fn run_complete_file( + test_file: TestFile, + validator: Validator, + mp: MultiProgress, + mp_style: ProgressStyle, +) -> Result<()> { let TestFile { path, relative_path, @@ -213,30 +448,48 @@ async fn run_complete_file(test_file: TestFile) -> Result<()> { return Ok(()); }; setup_scratch_dir(&relative_path)?; + + let count: u64 = get_record_count(&path, "Datafusion".to_string()); + let pb = mp.add(ProgressBar::new(count)); + + pb.set_style(mp_style); + pb.set_message(format!("{:?}", &relative_path)); + let mut runner = sqllogictest::Runner::new(|| async { Ok(DataFusion::new( test_ctx.session_ctx().clone(), relative_path.clone(), + pb.clone(), )) }); + let col_separator = " "; - runner + let res = runner .update_test_file( path, col_separator, - value_validator, - normalizer, + validator, + value_normalizer, strict_column_validator, ) .await // Can't use e directly because it isn't marked Send, so turn it into a string. .map_err(|e| { DataFusionError::Execution(format!("Error completing {relative_path:?}: {e}")) - }) + }); + + pb.finish_and_clear(); + + res } #[cfg(feature = "postgres")] -async fn run_complete_file_with_postgres(test_file: TestFile) -> Result<()> { +async fn run_complete_file_with_postgres( + test_file: TestFile, + validator: Validator, + mp: MultiProgress, + mp_style: ProgressStyle, +) -> Result<()> { use datafusion_sqllogictest::Postgres; let TestFile { path, @@ -247,26 +500,48 @@ async fn run_complete_file_with_postgres(test_file: TestFile) -> Result<()> { path.display() ); setup_scratch_dir(&relative_path)?; - let mut runner = - sqllogictest::Runner::new(|| Postgres::connect(relative_path.clone())); + + let count: u64 = get_record_count(&path, "postgresql".to_string()); + let pb = mp.add(ProgressBar::new(count)); + + pb.set_style(mp_style); + pb.set_message(format!("{:?}", &relative_path)); + + let mut runner = sqllogictest::Runner::new(|| { + Postgres::connect(relative_path.clone(), pb.clone()) + }); + runner.add_label("postgres"); + runner.with_column_validator(strict_column_validator); + runner.with_normalizer(value_normalizer); + runner.with_validator(validator); + let col_separator = " "; - runner + let res = runner .update_test_file( path, col_separator, - value_validator, - normalizer, + validator, + value_normalizer, strict_column_validator, ) .await // Can't use e directly because it isn't marked Send, so turn it into a string. .map_err(|e| { DataFusionError::Execution(format!("Error completing {relative_path:?}: {e}")) - }) + }); + + pb.finish_and_clear(); + + res } #[cfg(not(feature = "postgres"))] -async fn run_complete_file_with_postgres(_test_file: TestFile) -> Result<()> { +async fn run_complete_file_with_postgres( + _test_file: TestFile, + _validator: Validator, + _mp: MultiProgress, + _mp_style: ProgressStyle, +) -> Result<()> { use datafusion_common::plan_err; plan_err!("Can not run with postgres as postgres feature is not enabled") } @@ -282,11 +557,14 @@ struct TestFile { impl TestFile { fn new(path: PathBuf) -> Self { - let relative_path = PathBuf::from( - path.to_string_lossy() - .strip_prefix(TEST_DIRECTORY) - .unwrap_or(""), - ); + let p = path.to_string_lossy(); + let relative_path = PathBuf::from(if p.starts_with(TEST_DIRECTORY) { + p.strip_prefix(TEST_DIRECTORY).unwrap() + } else if p.starts_with(DATAFUSION_TESTING_TEST_DIRECTORY) { + p.strip_prefix(DATAFUSION_TESTING_TEST_DIRECTORY).unwrap() + } else { + "" + }); Self { path, @@ -298,6 +576,14 @@ impl TestFile { self.path.extension() == Some(OsStr::new("slt")) } + fn check_sqlite(&self, options: &Options) -> bool { + if !self.relative_path.starts_with(SQLITE_PREFIX) { + return true; + } + + options.include_sqlite + } + fn check_tpch(&self, options: &Options) -> bool { if !self.relative_path.starts_with("tpch") { return true; @@ -310,15 +596,29 @@ impl TestFile { fn read_test_files<'a>( options: &'a Options, ) -> Result + 'a>> { - Ok(Box::new( - read_dir_recursive(TEST_DIRECTORY)? + let mut paths = read_dir_recursive(TEST_DIRECTORY)? + .into_iter() + .map(TestFile::new) + .filter(|f| options.check_test_file(&f.relative_path)) + .filter(|f| f.is_slt_file()) + .filter(|f| f.check_tpch(options)) + .filter(|f| f.check_sqlite(options)) + .filter(|f| options.check_pg_compat_file(f.path.as_path())) + .collect::>(); + if options.include_sqlite { + let mut sqlite_paths = read_dir_recursive(DATAFUSION_TESTING_TEST_DIRECTORY)? .into_iter() .map(TestFile::new) .filter(|f| options.check_test_file(&f.relative_path)) .filter(|f| f.is_slt_file()) - .filter(|f| f.check_tpch(options)) - .filter(|f| options.check_pg_compat_file(f.path.as_path())), - )) + .filter(|f| f.check_sqlite(options)) + .filter(|f| options.check_pg_compat_file(f.path.as_path())) + .collect::>(); + + paths.append(&mut sqlite_paths) + } + + Ok(Box::new(paths.into_iter())) } fn read_dir_recursive>(path: P) -> Result> { @@ -350,7 +650,7 @@ fn read_dir_recursive_impl(dst: &mut Vec, path: &Path) -> Result<()> { /// Parsed command line options /// -/// This structure attempts to mimic the command line options of the built in rust test runner +/// This structure attempts to mimic the command line options of the built-in rust test runner /// accepted by IDEs such as CLion that pass arguments /// /// See for more details @@ -367,6 +667,9 @@ struct Options { )] postgres_runner: bool, + #[clap(long, env = "INCLUDE_SQLITE", help = "Include sqlite files")] + include_sqlite: bool, + #[clap(long, env = "INCLUDE_TPCH", help = "Include tpch files")] include_tpch: bool, @@ -431,10 +734,13 @@ impl Options { .any(|filter| relative_path.to_string_lossy().contains(filter)) } - /// Postgres runner executes only tests in files with specific names + /// Postgres runner executes only tests in files with specific names or in + /// specific folders fn check_pg_compat_file(&self, path: &Path) -> bool { let file_name = path.file_name().unwrap().to_str().unwrap().to_string(); - !self.postgres_runner || file_name.starts_with(PG_COMPAT_FILE_PREFIX) + !self.postgres_runner + || file_name.starts_with(PG_COMPAT_FILE_PREFIX) + || (self.include_sqlite && path.to_string_lossy().contains(SQLITE_PREFIX)) } /// Logs warning messages to stdout if any ignored options are passed @@ -452,3 +758,87 @@ impl Options { } } } + +#[cfg(feature = "postgres")] +pub async fn start_postgres( + in_channel: &Channel, + host_channel: &Channel, + port_channel: &Channel, + stopped_channel: &Channel<()>, +) { + info!("Starting postgres test container with user postgres/postgres and db test"); + + let container = testcontainers_modules::postgres::Postgres::default() + .with_user("postgres") + .with_password("postgres") + .with_db_name("test") + .with_mapped_port(16432, 5432.tcp()) + .with_tag("17-alpine") + .start() + .await + .unwrap(); + // uncomment this if you are running docker in docker + // let host = "host.docker.internal".to_string(); + let host = container.get_host().await.unwrap().to_string(); + let port = container.get_host_port_ipv4(5432).await.unwrap(); + + let mut rx = in_channel.rx.lock().await; + while let Some(command) = rx.recv().await { + match command { + FetchHost => host_channel.tx.send(host.clone()).unwrap(), + FetchPort => port_channel.tx.send(port).unwrap(), + ContainerCommands::Stop => { + container.stop().await.unwrap(); + stopped_channel.tx.send(()).unwrap(); + rx.close(); + } + } + } +} + +#[cfg(feature = "postgres")] +#[derive(Debug)] +pub enum ContainerCommands { + FetchHost, + FetchPort, + Stop, +} + +#[cfg(feature = "postgres")] +pub struct Channel { + pub tx: UnboundedSender, + pub rx: Mutex>, +} + +#[cfg(feature = "postgres")] +pub fn channel() -> Channel { + let (tx, rx) = mpsc::unbounded_channel(); + Channel { + tx, + rx: Mutex::new(rx), + } +} + +#[cfg(feature = "postgres")] +pub fn execute_blocking(f: F) { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() + .block_on(f); +} + +#[cfg(feature = "postgres")] +pub struct HostPort { + pub host: String, + pub port: u16, +} + +#[cfg(feature = "postgres")] +static POSTGRES_IN: Lazy> = Lazy::new(channel); +#[cfg(feature = "postgres")] +static POSTGRES_HOST: Lazy> = Lazy::new(channel); +#[cfg(feature = "postgres")] +static POSTGRES_PORT: Lazy> = Lazy::new(channel); +#[cfg(feature = "postgres")] +static POSTGRES_STOPPED: Lazy> = Lazy::new(channel); diff --git a/datafusion/sqllogictest/src/engines/datafusion_engine/normalize.rs b/datafusion/sqllogictest/src/engines/datafusion_engine/normalize.rs index ced497de22a7..58400280072c 100644 --- a/datafusion/sqllogictest/src/engines/datafusion_engine/normalize.rs +++ b/datafusion/sqllogictest/src/engines/datafusion_engine/normalize.rs @@ -239,6 +239,10 @@ pub fn cell_to_string(col: &ArrayRef, row: usize) -> Result { let key = dict.normalized_keys()[row]; Ok(cell_to_string(dict.values(), key)?) } + // only added because of a bug in v 1.0.4 (is) of lexical-write-integer + DataType::Int64 => { + Ok(format!("{}", get_row_value!(array::Int64Array, col, row))) + } _ => { let f = ArrayFormatter::try_new(col.as_ref(), &DEFAULT_FORMAT_OPTIONS); Ok(f.unwrap().value(row).to_string()) diff --git a/datafusion/sqllogictest/src/engines/datafusion_engine/runner.rs b/datafusion/sqllogictest/src/engines/datafusion_engine/runner.rs index 5c24b49cfe86..e696058484a9 100644 --- a/datafusion/sqllogictest/src/engines/datafusion_engine/runner.rs +++ b/datafusion/sqllogictest/src/engines/datafusion_engine/runner.rs @@ -18,26 +18,49 @@ use std::sync::Arc; use std::{path::PathBuf, time::Duration}; +use super::{error::Result, normalize, DFSqlLogicTestError}; use arrow::record_batch::RecordBatch; use async_trait::async_trait; use datafusion::physical_plan::common::collect; use datafusion::physical_plan::execute_stream; use datafusion::prelude::SessionContext; -use log::info; +use indicatif::ProgressBar; +use log::Level::{Debug, Info}; +use log::{debug, log_enabled, warn}; use sqllogictest::DBOutput; - -use super::{error::Result, normalize, DFSqlLogicTestError}; +use tokio::time::Instant; use crate::engines::output::{DFColumnType, DFOutput}; pub struct DataFusion { ctx: SessionContext, relative_path: PathBuf, + pb: ProgressBar, } impl DataFusion { - pub fn new(ctx: SessionContext, relative_path: PathBuf) -> Self { - Self { ctx, relative_path } + pub fn new(ctx: SessionContext, relative_path: PathBuf, pb: ProgressBar) -> Self { + Self { + ctx, + relative_path, + pb, + } + } + + fn update_slow_count(&self) { + let msg = self.pb.message(); + let split: Vec<&str> = msg.split(" ").collect(); + let mut current_count = 0; + + if split.len() > 2 { + // third match will be current slow count + current_count = split[2].parse::().unwrap(); + } + + current_count += 1; + + self.pb + .set_message(format!("{} - {} took > 500 ms", split[0], current_count)); } } @@ -47,12 +70,32 @@ impl sqllogictest::AsyncDB for DataFusion { type ColumnType = DFColumnType; async fn run(&mut self, sql: &str) -> Result { - info!( - "[{}] Running query: \"{}\"", - self.relative_path.display(), - sql - ); - run_query(&self.ctx, sql).await + if log_enabled!(Debug) { + debug!( + "[{}] Running query: \"{}\"", + self.relative_path.display(), + sql + ); + } + + let start = Instant::now(); + let result = run_query(&self.ctx, sql).await; + let duration = start.elapsed(); + + if duration.gt(&Duration::from_millis(500)) { + self.update_slow_count(); + } + + self.pb.inc(1); + + if log_enabled!(Info) && duration.gt(&Duration::from_secs(2)) { + warn!( + "[{}] Running query took more than 2 sec ({duration:?}): \"{sql}\"", + self.relative_path.display() + ); + } + + result } /// Engine name of current database. diff --git a/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs b/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs index a490488cd764..1439695d62c6 100644 --- a/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs +++ b/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs @@ -15,22 +15,24 @@ // specific language governing permissions and limitations // under the License. -/// Postgres engine implementation for sqllogictest. -use std::path::{Path, PathBuf}; -use std::str::FromStr; - use async_trait::async_trait; use bytes::Bytes; use futures::{SinkExt, StreamExt}; -use log::debug; +use log::{debug, info}; use sqllogictest::DBOutput; +/// Postgres engine implementation for sqllogictest. +use std::path::{Path, PathBuf}; +use std::str::FromStr; +use std::time::Duration; use tokio::task::JoinHandle; use super::conversion::*; use crate::engines::output::{DFColumnType, DFOutput}; use chrono::{NaiveDate, NaiveDateTime, NaiveTime}; +use indicatif::ProgressBar; use postgres_types::Type; use rust_decimal::Decimal; +use tokio::time::Instant; use tokio_postgres::{Column, Row}; use types::PgRegtype; @@ -55,6 +57,7 @@ pub struct Postgres { join_handle: JoinHandle<()>, /// Relative test file path relative_path: PathBuf, + pb: ProgressBar, } impl Postgres { @@ -71,11 +74,11 @@ impl Postgres { /// ``` /// /// See https://docs.rs/tokio-postgres/latest/tokio_postgres/config/struct.Config.html#url for format - pub async fn connect(relative_path: PathBuf) -> Result { + pub async fn connect(relative_path: PathBuf, pb: ProgressBar) -> Result { let uri = std::env::var("PG_URI").map_or(PG_URI.to_string(), std::convert::identity); - debug!("Using postgres connection string: {uri}"); + info!("Using postgres connection string: {uri}"); let config = tokio_postgres::Config::from_str(&uri)?; @@ -113,6 +116,7 @@ impl Postgres { client, join_handle, relative_path, + pb, }) } @@ -181,6 +185,22 @@ impl Postgres { tx.commit().await?; Ok(DBOutput::StatementComplete(0)) } + + fn update_slow_count(&self) { + let msg = self.pb.message(); + let split: Vec<&str> = msg.split(" ").collect(); + let mut current_count = 0; + + if split.len() > 2 { + // second match will be current slow count + current_count += split[2].parse::().unwrap(); + } + + current_count += 1; + + self.pb + .set_message(format!("{} - {} took > 500 ms", split[0], current_count)); + } } /// remove single quotes from the start and end of the string @@ -194,16 +214,13 @@ fn no_quotes(t: &str) -> &str { /// return a schema name fn schema_name(relative_path: &Path) -> String { relative_path - .file_name() - .map(|name| { - name.to_string_lossy() - .chars() - .filter(|ch| ch.is_ascii_alphabetic()) - .collect::() - .trim_start_matches("pg_") - .to_string() - }) - .unwrap_or_else(|| "default_schema".to_string()) + .to_string_lossy() + .to_string() + .chars() + .filter(|ch| ch.is_ascii_alphanumeric()) + .collect::() + .trim_start_matches("pg_") + .to_string() } impl Drop for Postgres { @@ -221,7 +238,7 @@ impl sqllogictest::AsyncDB for Postgres { &mut self, sql: &str, ) -> Result, Self::Error> { - println!( + debug!( "[{}] Running query: \"{}\"", self.relative_path.display(), sql @@ -242,14 +259,24 @@ impl sqllogictest::AsyncDB for Postgres { }; if lower_sql.starts_with("copy") { + self.pb.inc(1); return self.run_copy_command(sql).await; } if !is_query_sql { self.client.execute(sql, &[]).await?; + self.pb.inc(1); return Ok(DBOutput::StatementComplete(0)); } + let start = Instant::now(); let rows = self.client.query(sql, &[]).await?; + let duration = start.elapsed(); + + if duration.gt(&Duration::from_millis(500)) { + self.update_slow_count(); + } + + self.pb.inc(1); let types: Vec = if rows.is_empty() { self.client From c4650b9694e2055b5736dc9323143b23696f2daf Mon Sep 17 00:00:00 2001 From: Bruce Ritchie Date: Sun, 29 Dec 2024 21:32:42 +0000 Subject: [PATCH 3/7] Force version 0.24.0 of sqllogictest dependency until issue with labels is fixed. --- datafusion/sqllogictest/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/sqllogictest/Cargo.toml b/datafusion/sqllogictest/Cargo.toml index 88afdfceb412..e1ca8441436b 100644 --- a/datafusion/sqllogictest/Cargo.toml +++ b/datafusion/sqllogictest/Cargo.toml @@ -53,7 +53,7 @@ once_cell = { version = "1.20", optional = true } postgres-protocol = { version = "0.6.7", optional = true } postgres-types = { version = "0.2.8", features = ["derive", "with-chrono-0_4"], optional = true } rust_decimal = { version = "1.36.0", features = ["tokio-pg"] } -sqllogictest = "0.25.0" +sqllogictest = "=0.24.0" sqlparser = { workspace = true } tempfile = { workspace = true } testcontainers = { version = "0.23", features = ["default"], optional = true } From ba62846a7463c0b43f8f0f587fff134ca3057565 Mon Sep 17 00:00:00 2001 From: Bruce Ritchie Date: Sun, 29 Dec 2024 21:32:49 +0000 Subject: [PATCH 4/7] Removed workaround for bug that was fixed. --- .../sqllogictest/src/engines/datafusion_engine/normalize.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/datafusion/sqllogictest/src/engines/datafusion_engine/normalize.rs b/datafusion/sqllogictest/src/engines/datafusion_engine/normalize.rs index 58400280072c..ced497de22a7 100644 --- a/datafusion/sqllogictest/src/engines/datafusion_engine/normalize.rs +++ b/datafusion/sqllogictest/src/engines/datafusion_engine/normalize.rs @@ -239,10 +239,6 @@ pub fn cell_to_string(col: &ArrayRef, row: usize) -> Result { let key = dict.normalized_keys()[row]; Ok(cell_to_string(dict.values(), key)?) } - // only added because of a bug in v 1.0.4 (is) of lexical-write-integer - DataType::Int64 => { - Ok(format!("{}", get_row_value!(array::Int64Array, col, row))) - } _ => { let f = ArrayFormatter::try_new(col.as_ref(), &DEFAULT_FORMAT_OPTIONS); Ok(f.unwrap().value(row).to_string()) From d452068ed3076a64fd4ce57cb77749ad6675fa0e Mon Sep 17 00:00:00 2001 From: Bruce Ritchie Date: Sun, 29 Dec 2024 21:33:16 +0000 Subject: [PATCH 5/7] Git submodule update ... err update, link to sqlite tests. --- datafusion/sqllogictest/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/sqllogictest/README.md b/datafusion/sqllogictest/README.md index 124735c89d87..4a7dc09d7dd1 100644 --- a/datafusion/sqllogictest/README.md +++ b/datafusion/sqllogictest/README.md @@ -35,7 +35,7 @@ directory of the datafusion-testing crate. 1. `rustup update stable` DataFusion uses the latest stable release of rust 2. `git submodule init` -3. `git submodule update` +3. `git submodule update --init --remote --recursive` ## Running tests: TLDR Examples @@ -218,7 +218,7 @@ INCLUDE_TPCH=true cargo test --test sqllogictests ## Running Tests: `sqlite` Test files in `data/sqlite` directory of the datafusion-testing crate were -sourced from the sqlite test suite and have been cleansed and updated to +sourced from the [sqlite test suite](https://www.sqlite.org/sqllogictest/dir?ci=tip) and have been cleansed and updated to run within DataFusion's sqllogictest runner. To run the sqlite tests you need to increase the rust stack size and add From c297c81acea983173ef589cbfc44708354fb097d Mon Sep 17 00:00:00 2001 From: Bruce Ritchie Date: Sun, 29 Dec 2024 22:10:59 +0000 Subject: [PATCH 6/7] Git submodule update --- docs/source/contributor-guide/getting_started.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/contributor-guide/getting_started.md b/docs/source/contributor-guide/getting_started.md index 696d6d3a0fe2..5d85e07f3eaa 100644 --- a/docs/source/contributor-guide/getting_started.md +++ b/docs/source/contributor-guide/getting_started.md @@ -74,7 +74,7 @@ Testing setup: - `rustup update stable` DataFusion uses the latest stable release of rust - `git submodule init` -- `git submodule update` +- `git submodule update --init --remote --recursive` Formatting instructions: From 5230c8bfc9e2487979d6b0e724f3a6d99fdf37b0 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 30 Dec 2024 06:20:38 -0500 Subject: [PATCH 7/7] Readd submodule --- datafusion-testing | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) mode change 100755 => 160000 datafusion-testing diff --git a/datafusion-testing b/datafusion-testing deleted file mode 100755 index 8ec2a3ebcfea..000000000000 --- a/datafusion-testing +++ /dev/null @@ -1 +0,0 @@ -e2e320c9477a6d8ab09662eae255887733c0e304 \ No newline at end of file diff --git a/datafusion-testing b/datafusion-testing new file mode 160000 index 000000000000..e2e320c9477a --- /dev/null +++ b/datafusion-testing @@ -0,0 +1 @@ +Subproject commit e2e320c9477a6d8ab09662eae255887733c0e304