diff --git a/Cargo.lock b/Cargo.lock index a57b7597b8..1a249fc53f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -106,6 +106,12 @@ version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6b4930d2cb77ce62f89ee5d5289b4ac049559b1c45539271f5ed4fdc7db34545" +[[package]] +name = "arrayvec" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" + [[package]] name = "arrow" version = "52.0.0" @@ -328,6 +334,24 @@ dependencies = [ "regex-syntax", ] +[[package]] +name = "async-compression" +version = "0.4.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd066d0b4ef8ecb03a55319dc13aa6910616d0f44008a045bb1835af830abff5" +dependencies = [ + "bzip2", + "flate2", + "futures-core", + "futures-io", + "memchr", + "pin-project-lite", + "tokio", + "xz2", + "zstd", + "zstd-safe", +] + [[package]] name = "async-trait" version = "0.1.80" @@ -440,6 +464,37 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf4b9d6a944f767f8e5e0db018570623c85f3d925ac718db4e06d0187adb21c1" +[[package]] +name = "blake2" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46502ad458c9a52b69d4d4d32775c788b7a1b85e8bc9d482d92250fc0e3f8efe" +dependencies = [ + "digest", +] + +[[package]] +name = "blake3" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30cca6d3674597c30ddf2c587bf8d9d65c9a84d2326d941cc79c9842dfe0ef52" +dependencies = [ + "arrayref", + "arrayvec", + "cc", + "cfg-if", + "constant_time_eq", +] + +[[package]] +name = "block-buffer" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" +dependencies = [ + "generic-array", +] + [[package]] name = "brotli" version = "6.0.0" @@ -690,6 +745,12 @@ dependencies = [ "tiny-keccak", ] +[[package]] +name = "constant_time_eq" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7144d30dcf0fafbce74250a3963025d8d52177934239851c917d29f1df280c2" + [[package]] name = "core-foundation" version = "0.9.4" @@ -706,6 +767,15 @@ version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f" +[[package]] +name = "cpufeatures" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53fe5e26ff1b7aef8bca9c6080520cfb8d9333c7568e1829cef191a9723e5504" +dependencies = [ + "libc", +] + [[package]] name = "crc32fast" version = "1.4.2" @@ -803,6 +873,16 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7" +[[package]] +name = "crypto-common" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" +dependencies = [ + "generic-array", + "typenum", +] + [[package]] name = "csv" version = "1.3.0" @@ -824,6 +904,73 @@ dependencies = [ "memchr", ] +[[package]] +name = "dashmap" +version = "5.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" +dependencies = [ + "cfg-if", + "hashbrown", + "lock_api", + "once_cell", + "parking_lot_core", +] + +[[package]] +name = "datafusion" +version = "39.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f92d2d7a9cba4580900b32b009848d9eb35f1028ac84cdd6ddcf97612cd0068" +dependencies = [ + "ahash", + "arrow", + "arrow-array", + "arrow-ipc", + "arrow-schema", + "async-compression", + "async-trait", + "bytes", + "bzip2", + "chrono", + "dashmap", + "datafusion-common", + "datafusion-common-runtime", + "datafusion-execution", + "datafusion-expr", + "datafusion-functions", + "datafusion-functions-aggregate", + "datafusion-functions-array", + "datafusion-optimizer", + "datafusion-physical-expr", + "datafusion-physical-expr-common", + "datafusion-physical-plan", + "datafusion-sql", + "flate2", + "futures", + "glob", + "half", + "hashbrown", + "indexmap", + "itertools 0.12.1", + "log", + "num_cpus", + "object_store", + "parking_lot", + "parquet", + "paste", + "pin-project-lite", + "rand", + "sqlparser", + "tempfile", + "tokio", + "tokio-util", + "url", + "uuid", + "xz2", + "zstd", +] + [[package]] name = "datafusion-common" version = "39.0.0" @@ -841,9 +988,41 @@ dependencies = [ "instant", "libc", "num_cpus", + "object_store", + "parquet", "sqlparser", ] +[[package]] +name = "datafusion-common-runtime" +version = "39.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0091318129dad1359f08e4c6c71f855163c35bba05d1dbf983196f727857894" +dependencies = [ + "tokio", +] + +[[package]] +name = "datafusion-execution" +version = "39.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8385aba84fc4a06d3ebccfbcbf9b4f985e80c762fac634b49079f7cc14933fb1" +dependencies = [ + "arrow", + "chrono", + "dashmap", + "datafusion-common", + "datafusion-expr", + "futures", + "hashbrown", + "log", + "object_store", + "parking_lot", + "rand", + "tempfile", + "url", +] + [[package]] name = "datafusion-expr" version = "39.0.0" @@ -863,6 +1042,184 @@ dependencies = [ "strum_macros", ] +[[package]] +name = "datafusion-functions" +version = "39.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "27c081ae5b7edd712b92767fb8ed5c0e32755682f8075707666cd70835807c0b" +dependencies = [ + "arrow", + "base64", + "blake2", + "blake3", + "chrono", + "datafusion-common", + "datafusion-execution", + "datafusion-expr", + "datafusion-physical-expr", + "hashbrown", + "hex", + "itertools 0.12.1", + "log", + "md-5", + "rand", + "regex", + "sha2", + "unicode-segmentation", + "uuid", +] + +[[package]] +name = "datafusion-functions-aggregate" +version = "39.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "feb28a4ea52c28a26990646986a27c4052829a2a2572386258679e19263f8b78" +dependencies = [ + "ahash", + "arrow", + "arrow-schema", + "datafusion-common", + "datafusion-execution", + "datafusion-expr", + "datafusion-physical-expr-common", + "log", + "paste", + "sqlparser", +] + +[[package]] +name = "datafusion-functions-array" +version = "39.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89b17c02a74cdc87380a56758ec27e7d417356bf806f33062700908929aedb8a" +dependencies = [ + "arrow", + "arrow-array", + "arrow-buffer", + "arrow-ord", + "arrow-schema", + "datafusion-common", + "datafusion-execution", + "datafusion-expr", + "datafusion-functions", + "itertools 0.12.1", + "log", + "paste", +] + +[[package]] +name = "datafusion-optimizer" +version = "39.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12172f2a6c9eb4992a51e62d709eeba5dedaa3b5369cce37ff6c2260e100ba76" +dependencies = [ + "arrow", + "async-trait", + "chrono", + "datafusion-common", + "datafusion-expr", + "datafusion-physical-expr", + "hashbrown", + "indexmap", + "itertools 0.12.1", + "log", + "regex-syntax", +] + +[[package]] +name = "datafusion-physical-expr" +version = "39.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a3fce531b623e94180f6cd33d620ef01530405751b6ddd2fd96250cdbd78e2e" +dependencies = [ + "ahash", + "arrow", + "arrow-array", + "arrow-buffer", + "arrow-ord", + "arrow-schema", + "arrow-string", + "base64", + "chrono", + "datafusion-common", + "datafusion-execution", + "datafusion-expr", + "datafusion-functions-aggregate", + "datafusion-physical-expr-common", + "half", + "hashbrown", + "hex", + "indexmap", + "itertools 0.12.1", + "log", + "paste", + "petgraph", + "regex", +] + +[[package]] +name = "datafusion-physical-expr-common" +version = "39.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "046400b6a2cc3ed57a7c576f5ae6aecc77804ac8e0186926b278b189305b2a77" +dependencies = [ + "arrow", + "datafusion-common", + "datafusion-expr", + "rand", +] + +[[package]] +name = "datafusion-physical-plan" +version = "39.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4aed47f5a2ad8766260befb375b201592e86a08b260256e168ae4311426a2bff" +dependencies = [ + "ahash", + "arrow", + "arrow-array", + "arrow-buffer", + "arrow-ord", + "arrow-schema", + "async-trait", + "chrono", + "datafusion-common", + "datafusion-common-runtime", + "datafusion-execution", + "datafusion-expr", + "datafusion-functions-aggregate", + "datafusion-physical-expr", + "datafusion-physical-expr-common", + "futures", + "half", + "hashbrown", + "indexmap", + "itertools 0.12.1", + "log", + "once_cell", + "parking_lot", + "pin-project-lite", + "rand", + "tokio", +] + +[[package]] +name = "datafusion-sql" +version = "39.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fa92bb1fd15e46ce5fb6f1c85f3ac054592560f294429a28e392b5f9cd4255e" +dependencies = [ + "arrow", + "arrow-array", + "arrow-schema", + "datafusion-common", + "datafusion-expr", + "log", + "regex", + "sqlparser", + "strum", +] + [[package]] name = "deranged" version = "0.3.11" @@ -873,14 +1230,14 @@ dependencies = [ ] [[package]] -name = "displaydoc" -version = "0.2.4" +name = "digest" +version = "0.10.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "487585f4d0c6655fe74905e2504d8ad6908e4db67f744eb140876906c2f3175d" +checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.66", + "block-buffer", + "crypto-common", + "subtle", ] [[package]] @@ -908,6 +1265,12 @@ dependencies = [ "syn 2.0.66", ] +[[package]] +name = "doc-comment" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" + [[package]] name = "either" version = "1.12.0" @@ -1144,6 +1507,16 @@ dependencies = [ "byteorder", ] +[[package]] +name = "generic-array" +version = "0.14.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" +dependencies = [ + "typenum", + "version_check", +] + [[package]] name = "getrandom" version = "0.2.15" @@ -1163,6 +1536,12 @@ version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "40ecd4077b5ae9fd2e9e169b102c6c330d0605168eb0e8bf79952b256dbefffd" +[[package]] +name = "glob" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" + [[package]] name = "h2" version = "0.4.5" @@ -1221,6 +1600,12 @@ version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" +[[package]] +name = "hex" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" + [[package]] name = "http" version = "1.1.0" @@ -1270,6 +1655,12 @@ dependencies = [ "libm", ] +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + [[package]] name = "hyper" version = "1.3.1" @@ -1366,134 +1757,14 @@ dependencies = [ "cc", ] -[[package]] -name = "icu_collections" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db2fa452206ebee18c4b5c2274dbf1de17008e874b4dc4f0aea9d01ca79e4526" -dependencies = [ - "displaydoc", - "yoke", - "zerofrom", - "zerovec", -] - -[[package]] -name = "icu_locid" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13acbb8371917fc971be86fc8057c41a64b521c184808a698c02acc242dbf637" -dependencies = [ - "displaydoc", - "litemap", - "tinystr", - "writeable", - "zerovec", -] - -[[package]] -name = "icu_locid_transform" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01d11ac35de8e40fdeda00d9e1e9d92525f3f9d887cdd7aa81d727596788b54e" -dependencies = [ - "displaydoc", - "icu_locid", - "icu_locid_transform_data", - "icu_provider", - "tinystr", - "zerovec", -] - -[[package]] -name = "icu_locid_transform_data" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fdc8ff3388f852bede6b579ad4e978ab004f139284d7b28715f773507b946f6e" - -[[package]] -name = "icu_normalizer" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19ce3e0da2ec68599d193c93d088142efd7f9c5d6fc9b803774855747dc6a84f" -dependencies = [ - "displaydoc", - "icu_collections", - "icu_normalizer_data", - "icu_properties", - "icu_provider", - "smallvec", - "utf16_iter", - "utf8_iter", - "write16", - "zerovec", -] - -[[package]] -name = "icu_normalizer_data" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8cafbf7aa791e9b22bec55a167906f9e1215fd475cd22adfcf660e03e989516" - -[[package]] -name = "icu_properties" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f8ac670d7422d7f76b32e17a5db556510825b29ec9154f235977c9caba61036" -dependencies = [ - "displaydoc", - "icu_collections", - "icu_locid_transform", - "icu_properties_data", - "icu_provider", - "tinystr", - "zerovec", -] - -[[package]] -name = "icu_properties_data" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67a8effbc3dd3e4ba1afa8ad918d5684b8868b3b26500753effea8d2eed19569" - -[[package]] -name = "icu_provider" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ed421c8a8ef78d3e2dbc98a973be2f3770cb42b606e3ab18d6237c4dfde68d9" -dependencies = [ - "displaydoc", - "icu_locid", - "icu_provider_macros", - "stable_deref_trait", - "tinystr", - "writeable", - "yoke", - "zerofrom", - "zerovec", -] - -[[package]] -name = "icu_provider_macros" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ec89e9337638ecdc08744df490b221a7399bf8d164eb52a665454e60e075ad6" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.66", -] - [[package]] name = "idna" -version = "1.0.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4716a3a0933a1d01c2f72450e89596eb51dd34ef3c211ccd875acdf1f8fe47ed" +checksum = "634d9b1461af396cad843f47fdba5597a4f9e6ddd4bfb6ff5d85028c25cb12f6" dependencies = [ - "icu_normalizer", - "icu_properties", - "smallvec", - "utf8_iter", + "unicode-bidi", + "unicode-normalization", ] [[package]] @@ -1696,12 +1967,6 @@ version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" -[[package]] -name = "litemap" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "643cb0b8d4fcc284004d5fd0d67ccf61dfffadb7f75e1e71bc420f4688a3a704" - [[package]] name = "lock_api" version = "0.4.12" @@ -1727,12 +1992,33 @@ dependencies = [ "twox-hash", ] +[[package]] +name = "lzma-sys" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fda04ab3764e6cde78b9974eec4f779acaba7c4e84b36eca3cf77c581b85d27" +dependencies = [ + "cc", + "libc", + "pkg-config", +] + [[package]] name = "matchit" version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" +[[package]] +name = "md-5" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" +dependencies = [ + "cfg-if", + "digest", +] + [[package]] name = "memchr" version = "2.7.4" @@ -2001,6 +2287,27 @@ dependencies = [ "memchr", ] +[[package]] +name = "object_store" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbebfd32c213ba1907fa7a9c9138015a8de2b43e30c5aa45b18f7deb46786ad6" +dependencies = [ + "async-trait", + "bytes", + "chrono", + "futures", + "humantime", + "itertools 0.12.1", + "parking_lot", + "percent-encoding", + "snafu", + "tokio", + "tracing", + "url", + "walkdir", +] + [[package]] name = "once_cell" version = "1.19.0" @@ -2114,15 +2421,18 @@ dependencies = [ "bytes", "chrono", "flate2", + "futures", "half", "hashbrown", "lz4_flex", "num", "num-bigint", + "object_store", "paste", "seq-macro", "snap", "thrift", + "tokio", "twox-hash", "zstd", "zstd-sys", @@ -2844,6 +3154,17 @@ dependencies = [ "serde", ] +[[package]] +name = "sha2" +version = "0.10.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "signal-hook-registry" version = "1.4.2" @@ -2886,6 +3207,28 @@ version = "1.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" +[[package]] +name = "snafu" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4de37ad025c587a29e8f3f5605c00f70b98715ef90b9061a815b9e59e9042d6" +dependencies = [ + "doc-comment", + "snafu-derive", +] + +[[package]] +name = "snafu-derive" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "990079665f075b699031e9c08fd3ab99be5029b96f3b78dc0709e8f77e4efebf" +dependencies = [ + "heck 0.4.1", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "snap" version = "1.1.1" @@ -2929,12 +3272,6 @@ dependencies = [ "syn 2.0.66", ] -[[package]] -name = "stable_deref_trait" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" - [[package]] name = "static_assertions" version = "1.1.0" @@ -2997,17 +3334,6 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394" -[[package]] -name = "synstructure" -version = "0.13.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.66", -] - [[package]] name = "system-configuration" version = "0.5.1" @@ -3139,16 +3465,6 @@ dependencies = [ "crunchy", ] -[[package]] -name = "tinystr" -version = "0.7.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9117f5d4db391c1cf6927e7bea3db74b9a1c1add8f7eda9ffd5364f40f57b82f" -dependencies = [ - "displaydoc", - "zerovec", -] - [[package]] name = "tinytemplate" version = "1.2.1" @@ -3159,6 +3475,21 @@ dependencies = [ "serde_json", ] +[[package]] +name = "tinyvec" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87cc5ceb3875bb20c2890005a4e226a4651264a5c75edb2421b52861a0a0cb50" +dependencies = [ + "tinyvec_macros", +] + +[[package]] +name = "tinyvec_macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" + [[package]] name = "tokio" version = "1.38.0" @@ -3285,9 +3616,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ "pin-project-lite", + "tracing-attributes", "tracing-core", ] +[[package]] +name = "tracing-attributes" +version = "0.1.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.66", +] + [[package]] name = "tracing-core" version = "0.1.32" @@ -3313,12 +3656,39 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "typenum" +version = "1.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" + +[[package]] +name = "unicode-bidi" +version = "0.3.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08f95100a766bf4f8f28f90d77e0a5461bbdb219042e7679bebe79004fed8d75" + [[package]] name = "unicode-ident" version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" +[[package]] +name = "unicode-normalization" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a56d1686db2308d901306f92a263857ef59ea39678a5458e7cb17f01415101f5" +dependencies = [ + "tinyvec", +] + +[[package]] +name = "unicode-segmentation" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4c87d22b6e3f4a18d4d40ef354e97c90fcb14dd91d7dc0aa9d8a1172ebf7202" + [[package]] name = "unicode-width" version = "0.1.13" @@ -3339,27 +3709,15 @@ checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" [[package]] name = "url" -version = "2.5.1" +version = "2.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7c25da092f0a868cdf09e8674cd3b7ef3a7d92a24253e663a2fb85e2496de56" +checksum = "22784dbdf76fdde8af1aeda5622b546b422b6fc585325248a2bf9f5e41e94d6c" dependencies = [ "form_urlencoded", "idna", "percent-encoding", ] -[[package]] -name = "utf16_iter" -version = "1.0.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8232dd3cdaed5356e0f716d285e4b40b932ac434100fe9b7e0e8e935b9e6246" - -[[package]] -name = "utf8_iter" -version = "1.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" - [[package]] name = "uuid" version = "1.8.0" @@ -3401,6 +3759,7 @@ version = "0.1.0" dependencies = [ "arrow-array", "arrow-buffer", + "arrow-cast", "arrow-schema", "build-vortex", "criterion", @@ -3437,6 +3796,27 @@ dependencies = [ "flexbuffers", ] +[[package]] +name = "vortex-datafusion" +version = "0.1.0" +dependencies = [ + "arrow-array", + "arrow-schema", + "async-trait", + "datafusion", + "datafusion-common", + "datafusion-execution", + "datafusion-expr", + "datafusion-physical-expr", + "datafusion-physical-plan", + "futures", + "pin-project", + "tokio", + "vortex-array", + "vortex-dtype", + "vortex-error", +] + [[package]] name = "vortex-datetime-parts" version = "0.1.0" @@ -3993,39 +4373,12 @@ dependencies = [ ] [[package]] -name = "write16" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d1890f4022759daae28ed4fe62859b1236caebfc61ede2f63ed4e695f3f6d936" - -[[package]] -name = "writeable" -version = "0.5.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e9df38ee2d2c3c5948ea468a8406ff0db0b29ae1ffde1bcf20ef305bcc95c51" - -[[package]] -name = "yoke" -version = "0.7.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c5b1314b079b0930c31e3af543d8ee1757b1951ae1e1565ec704403a7240ca5" -dependencies = [ - "serde", - "stable_deref_trait", - "yoke-derive", - "zerofrom", -] - -[[package]] -name = "yoke-derive" -version = "0.7.4" +name = "xz2" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28cc31741b18cb6f1d5ff12f5b7523e3d6eb0852bbbad19d73905511d9849b95" +checksum = "388c44dc09d76f1536602ead6d325eb532f5c122f17782bd57fb47baeeb767e2" dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.66", - "synstructure", + "lzma-sys", ] [[package]] @@ -4048,55 +4401,12 @@ dependencies = [ "syn 2.0.66", ] -[[package]] -name = "zerofrom" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91ec111ce797d0e0784a1116d0ddcdbea84322cd79e5d5ad173daeba4f93ab55" -dependencies = [ - "zerofrom-derive", -] - -[[package]] -name = "zerofrom-derive" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ea7b4a3637ea8669cedf0f1fd5c286a17f3de97b8dd5a70a6c167a1730e63a5" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.66", - "synstructure", -] - [[package]] name = "zeroize" version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde" -[[package]] -name = "zerovec" -version = "0.10.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb2cc8827d6c0994478a15c53f374f46fbd41bea663d809b14744bc42e6b109c" -dependencies = [ - "yoke", - "zerofrom", - "zerovec-derive", -] - -[[package]] -name = "zerovec-derive" -version = "0.10.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97cf56601ee5052b4417d90c8755c6683473c926039908196cf35d99f893ebe7" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.66", -] - [[package]] name = "zigzag" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 1bc655ff13..ac85667a71 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ members = [ "pyvortex", "vortex-array", "vortex-buffer", + "vortex-datafusion", "vortex-dtype", "vortex-error", "vortex-expr", @@ -36,11 +37,13 @@ arrayref = "0.3.7" arrow = { version = "52.0.0", features = ["pyarrow"] } arrow-array = "52.0.0" arrow-buffer = "52.0.0" +arrow-cast = "52.0.0" arrow-csv = "52.0.0" arrow-data = "52.0.0" arrow-ipc = "52.0.0" arrow-schema = "52.0.0" arrow-select = "52.0.0" +async-trait = "0.1" bindgen = "0.69.4" bytes = "1.6.0" bzip2 = "0.4.4" @@ -48,8 +51,13 @@ cargo_metadata = "0.18.1" criterion = { version = "0.5.1", features = ["html_reports"] } croaring = "1.0.1" csv = "1.3.0" +datafusion = "39.0.0" datafusion-common = "39.0.0" +datafusion-execution = "39.0.0" datafusion-expr = "39.0.0" +datafusion-physical-expr = "39.0.0" +datafusion-physical-plan = "39.0.0" +derive_builder = "0.20.0" divan = "0.1.14" duckdb = { version = "0.10.1", features = ["bundled"] } enum-iterator = "2.0.0" diff --git a/bench-vortex/Cargo.toml b/bench-vortex/Cargo.toml index b59fc0cb04..fa2f178cab 100644 --- a/bench-vortex/Cargo.toml +++ b/bench-vortex/Cargo.toml @@ -28,7 +28,7 @@ log = { workspace = true } parquet = { workspace = true, features = [] } reqwest = { workspace = true } simplelog = { workspace = true } -tokio = { workspace = true } +tokio = { workspace = true, features = ["full"] } uuid = { workspace = true, features = ["v4"] } vortex-alp = { path = "../encodings/alp" } vortex-array = { path = "../vortex-array" } diff --git a/bench-vortex/src/lib.rs b/bench-vortex/src/lib.rs index 99ff92e0bb..b6781ad335 100644 --- a/bench-vortex/src/lib.rs +++ b/bench-vortex/src/lib.rs @@ -216,7 +216,6 @@ mod test { use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use vortex::arrow::FromArrowArray; use vortex::compress::Compressor; - use vortex::compute::as_arrow::as_arrow; use vortex::{ArrayData, IntoArray}; use crate::taxi_data::taxi_data_parquet; @@ -240,7 +239,7 @@ mod test { let struct_arrow: ArrowStructArray = record_batch.into(); let arrow_array: ArrowArrayRef = Arc::new(struct_arrow); let vortex_array = ArrayData::from_arrow(arrow_array.clone(), false).into_array(); - let vortex_as_arrow = as_arrow(&vortex_array).unwrap(); + let vortex_as_arrow = vortex_array.flatten().unwrap().into_arrow(); assert_eq!(vortex_as_arrow.deref(), arrow_array.deref()); } } @@ -260,7 +259,7 @@ mod test { let vortex_array = ArrayData::from_arrow(arrow_array.clone(), false).into_array(); let compressed = Compressor::new(&CTX).compress(&vortex_array, None).unwrap(); - let compressed_as_arrow = as_arrow(&compressed).unwrap(); + let compressed_as_arrow = compressed.flatten().unwrap().into_arrow(); assert_eq!(compressed_as_arrow.deref(), arrow_array.deref()); } } diff --git a/pyvortex/src/vortex_arrow.rs b/pyvortex/src/vortex_arrow.rs index 4b8dc2c8e3..f9cb05c1af 100644 --- a/pyvortex/src/vortex_arrow.rs +++ b/pyvortex/src/vortex_arrow.rs @@ -1,14 +1,12 @@ -use arrow::array::Array as ArrowArray; +use arrow::array::{Array as ArrowArray, ArrayRef}; use arrow::error::ArrowError; use arrow::pyarrow::ToPyArrow; use pyo3::exceptions::PyValueError; use pyo3::prelude::*; use pyo3::types::{IntoPyDict, PyList}; -use vortex::compute::as_arrow::as_arrow_chunks; +use vortex::array::chunked::ChunkedArray; use vortex::Array; -use crate::error::PyVortexError; - pub fn map_arrow_err(error: ArrowError) -> PyErr { PyValueError::new_err(error.to_string()) } @@ -16,7 +14,14 @@ pub fn map_arrow_err(error: ArrowError) -> PyErr { pub fn export_array<'py>(py: Python<'py>, array: &Array) -> PyResult> { // NOTE(ngates): for struct arrays, we could also return a RecordBatchStreamReader. // NOTE(robert): Return RecordBatchStreamReader always? - let chunks = as_arrow_chunks(array).map_err(PyVortexError::map_err)?; + let chunks: Vec = if let Ok(chunked_array) = ChunkedArray::try_from(array) { + chunked_array + .chunks() + .map(|chunk| chunk.flatten().unwrap().into_arrow()) + .collect() + } else { + vec![array.clone().flatten().unwrap().into_arrow()] + }; if chunks.is_empty() { return Err(PyValueError::new_err("No chunks in array")); } diff --git a/vortex-array/Cargo.toml b/vortex-array/Cargo.toml index 4c7238ab09..7904d72c7e 100644 --- a/vortex-array/Cargo.toml +++ b/vortex-array/Cargo.toml @@ -21,6 +21,7 @@ workspace = true [dependencies] arrow-array = { workspace = true } arrow-buffer = { workspace = true } +arrow-cast = { workspace = true } arrow-schema = { workspace = true } enum-iterator = { workspace = true } flatbuffers = { workspace = true } diff --git a/vortex-array/src/array/bool/compute/as_arrow.rs b/vortex-array/src/array/bool/compute/as_arrow.rs deleted file mode 100644 index 47ff558e34..0000000000 --- a/vortex-array/src/array/bool/compute/as_arrow.rs +++ /dev/null @@ -1,17 +0,0 @@ -use std::sync::Arc; - -use arrow_array::{ArrayRef as ArrowArrayRef, BooleanArray as ArrowBoolArray}; -use vortex_error::VortexResult; - -use crate::array::bool::BoolArray; -use crate::compute::as_arrow::AsArrowArray; -use crate::validity::ArrayValidity; - -impl AsArrowArray for BoolArray { - fn as_arrow(&self) -> VortexResult { - Ok(Arc::new(ArrowBoolArray::new( - self.boolean_buffer(), - self.logical_validity().to_null_buffer()?, - ))) - } -} diff --git a/vortex-array/src/array/bool/compute/mod.rs b/vortex-array/src/array/bool/compute/mod.rs index 36970c63c6..baea8e09cb 100644 --- a/vortex-array/src/array/bool/compute/mod.rs +++ b/vortex-array/src/array/bool/compute/mod.rs @@ -1,5 +1,4 @@ use crate::array::bool::BoolArray; -use crate::compute::as_arrow::AsArrowArray; use crate::compute::compare::CompareFn; use crate::compute::fill::FillForwardFn; use crate::compute::scalar_at::ScalarAtFn; @@ -7,7 +6,6 @@ use crate::compute::slice::SliceFn; use crate::compute::take::TakeFn; use crate::compute::ArrayCompute; -mod as_arrow; mod compare; mod fill; mod flatten; @@ -16,10 +14,6 @@ mod slice; mod take; impl ArrayCompute for BoolArray { - fn as_arrow(&self) -> Option<&dyn AsArrowArray> { - Some(self) - } - fn compare(&self) -> Option<&dyn CompareFn> { Some(self) } diff --git a/vortex-array/src/array/datetime/localdatetime.rs b/vortex-array/src/array/datetime/localdatetime.rs index d332a851bb..b2ff7d4512 100644 --- a/vortex-array/src/array/datetime/localdatetime.rs +++ b/vortex-array/src/array/datetime/localdatetime.rs @@ -1,20 +1,10 @@ -use std::sync::Arc; - -use arrow_array::{ - ArrayRef as ArrowArrayRef, TimestampMicrosecondArray, TimestampMillisecondArray, - TimestampNanosecondArray, TimestampSecondArray, -}; -use arrow_buffer::ScalarBuffer; use lazy_static::lazy_static; -use vortex_dtype::{DType, ExtDType, ExtID, PType}; +use vortex_dtype::{DType, ExtDType, ExtID}; use vortex_error::{vortex_bail, vortex_err, VortexError, VortexResult}; use crate::array::datetime::TimeUnit; use crate::array::extension::ExtensionArray; -use crate::compute::as_arrow::AsArrowArray; -use crate::compute::cast::cast; -use crate::validity::ArrayValidity; -use crate::{Array, ArrayDType, ArrayData, ArrayTrait, IntoArrayData}; +use crate::{Array, ArrayDType, ArrayData, IntoArrayData}; lazy_static! { pub static ref ID: ExtID = ExtID::from(LocalDateTimeArray::ID); @@ -83,23 +73,6 @@ impl TryFrom<&ExtensionArray> for LocalDateTimeArray { } } -impl AsArrowArray for LocalDateTimeArray { - fn as_arrow(&self) -> VortexResult { - // A LocalDateTime maps to an Arrow Timestamp array with no timezone. - let timestamps = cast(&self.timestamps(), PType::I64.into())?.flatten_primitive()?; - let validity = timestamps.logical_validity().to_null_buffer()?; - let timestamps_len = timestamps.len(); - let buffer = ScalarBuffer::::new(timestamps.into_buffer().into(), 0, timestamps_len); - - Ok(match self.time_unit() { - TimeUnit::Ns => Arc::new(TimestampNanosecondArray::new(buffer, validity)), - TimeUnit::Us => Arc::new(TimestampMicrosecondArray::new(buffer, validity)), - TimeUnit::Ms => Arc::new(TimestampMillisecondArray::new(buffer, validity)), - TimeUnit::S => Arc::new(TimestampSecondArray::new(buffer, validity)), - }) - } -} - impl TryFrom<&Array> for LocalDateTimeArray { type Error = VortexError; diff --git a/vortex-array/src/array/extension/compute.rs b/vortex-array/src/array/extension/compute.rs index d7611d3f73..fb007fee78 100644 --- a/vortex-array/src/array/extension/compute.rs +++ b/vortex-array/src/array/extension/compute.rs @@ -1,10 +1,7 @@ -use arrow_array::ArrayRef as ArrowArrayRef; -use vortex_error::{vortex_bail, VortexResult}; +use vortex_error::VortexResult; use vortex_scalar::Scalar; -use crate::array::datetime::LocalDateTimeArray; use crate::array::extension::ExtensionArray; -use crate::compute::as_arrow::AsArrowArray; use crate::compute::cast::CastFn; use crate::compute::scalar_at::{scalar_at, ScalarAtFn}; use crate::compute::slice::{slice, SliceFn}; @@ -13,10 +10,6 @@ use crate::compute::ArrayCompute; use crate::{Array, IntoArray}; impl ArrayCompute for ExtensionArray { - fn as_arrow(&self) -> Option<&dyn AsArrowArray> { - Some(self) - } - fn cast(&self) -> Option<&dyn CastFn> { // It's not possible to cast an extension array to another type. // TODO(ngates): we should allow some extension arrays to implement a callback @@ -37,18 +30,6 @@ impl ArrayCompute for ExtensionArray { } } -impl AsArrowArray for ExtensionArray { - /// To support full compatability with Arrow, we hard-code the conversion of our datetime - /// arrays to Arrow's Timestamp arrays here. For all other extension arrays, we return an - /// Arrow extension array with the same definition. - fn as_arrow(&self) -> VortexResult { - match self.id().as_ref() { - "vortex.localdatetime" => LocalDateTimeArray::try_from(self)?.as_arrow(), - _ => vortex_bail!("Arrow extension arrays not yet supported"), - } - } -} - impl ScalarAtFn for ExtensionArray { fn scalar_at(&self, index: usize) -> VortexResult { Ok(Scalar::extension( diff --git a/vortex-array/src/array/null/as_arrow.rs b/vortex-array/src/array/null/as_arrow.rs deleted file mode 100644 index f00fdef8be..0000000000 --- a/vortex-array/src/array/null/as_arrow.rs +++ /dev/null @@ -1,48 +0,0 @@ -//! Implementation of the [AsArrowArray] trait for [ConstantArray] that is representing -//! [DType::Null] values. - -use std::sync::Arc; - -use arrow_array::{ArrayRef as ArrowArrayRef, NullArray as ArrowNullArray}; -use vortex_error::VortexResult; - -use crate::array::null::NullArray; -use crate::compute::as_arrow::AsArrowArray; -use crate::ArrayTrait; - -impl AsArrowArray for NullArray { - fn as_arrow(&self) -> VortexResult { - let arrow_null = ArrowNullArray::new(self.len()); - Ok(Arc::new(arrow_null)) - } -} - -#[cfg(test)] -mod test { - use arrow_array::{Array, NullArray as ArrowNullArray}; - - use crate::array::null::NullArray; - use crate::arrow::FromArrowArray; - use crate::compute::as_arrow::AsArrowArray; - use crate::validity::{ArrayValidity, LogicalValidity}; - use crate::{ArrayData, ArrayTrait, IntoArray}; - - #[test] - fn test_round_trip() { - let arrow_nulls = ArrowNullArray::new(10); - let vortex_nulls = ArrayData::from_arrow(&arrow_nulls, true).into_array(); - - let vortex_nulls = NullArray::try_from(vortex_nulls).unwrap(); - assert_eq!(vortex_nulls.len(), 10); - assert!(matches!( - vortex_nulls.logical_validity(), - LogicalValidity::AllInvalid(10) - )); - - let to_arrow = vortex_nulls.as_arrow().unwrap(); - assert_eq!( - *to_arrow.as_any().downcast_ref::().unwrap(), - arrow_nulls - ); - } -} diff --git a/vortex-array/src/array/null/mod.rs b/vortex-array/src/array/null/mod.rs index 7eae7c4a2a..fb0d932617 100644 --- a/vortex-array/src/array/null/mod.rs +++ b/vortex-array/src/array/null/mod.rs @@ -5,7 +5,6 @@ use crate::validity::{ArrayValidity, LogicalValidity, Validity}; use crate::visitor::{AcceptArrayVisitor, ArrayVisitor}; use crate::{impl_encoding, ArrayFlatten}; -mod as_arrow; mod compute; impl_encoding!("vortex.null", Null); diff --git a/vortex-array/src/array/primitive/compute/as_arrow.rs b/vortex-array/src/array/primitive/compute/as_arrow.rs deleted file mode 100644 index 9ed4006a82..0000000000 --- a/vortex-array/src/array/primitive/compute/as_arrow.rs +++ /dev/null @@ -1,41 +0,0 @@ -use std::sync::Arc; - -use arrow_array::{ - ArrayRef as ArrowArrayRef, ArrowPrimitiveType, PrimitiveArray as ArrowPrimitiveArray, -}; -use arrow_buffer::ScalarBuffer; -use vortex_dtype::PType; -use vortex_error::VortexResult; - -use crate::array::primitive::PrimitiveArray; -use crate::compute::as_arrow::AsArrowArray; -use crate::validity::ArrayValidity; -use crate::ArrayTrait; - -impl AsArrowArray for PrimitiveArray { - fn as_arrow(&self) -> VortexResult { - use arrow_array::types::*; - Ok(match self.ptype() { - PType::U8 => Arc::new(as_arrow_array_primitive::(self)?), - PType::U16 => Arc::new(as_arrow_array_primitive::(self)?), - PType::U32 => Arc::new(as_arrow_array_primitive::(self)?), - PType::U64 => Arc::new(as_arrow_array_primitive::(self)?), - PType::I8 => Arc::new(as_arrow_array_primitive::(self)?), - PType::I16 => Arc::new(as_arrow_array_primitive::(self)?), - PType::I32 => Arc::new(as_arrow_array_primitive::(self)?), - PType::I64 => Arc::new(as_arrow_array_primitive::(self)?), - PType::F16 => Arc::new(as_arrow_array_primitive::(self)?), - PType::F32 => Arc::new(as_arrow_array_primitive::(self)?), - PType::F64 => Arc::new(as_arrow_array_primitive::(self)?), - }) - } -} - -fn as_arrow_array_primitive( - array: &PrimitiveArray, -) -> VortexResult> { - Ok(ArrowPrimitiveArray::new( - ScalarBuffer::::new(array.buffer().clone().into(), 0, array.len()), - array.logical_validity().to_null_buffer()?, - )) -} diff --git a/vortex-array/src/array/primitive/compute/filter_indices.rs b/vortex-array/src/array/primitive/compute/filter_indices.rs index 0b2af9b1a8..d77f07987e 100644 --- a/vortex-array/src/array/primitive/compute/filter_indices.rs +++ b/vortex-array/src/array/primitive/compute/filter_indices.rs @@ -68,7 +68,6 @@ fn apply_predicate bool>( #[cfg(test)] mod test { - use itertools::Itertools; use vortex_dtype::field::FieldPath; use vortex_expr::{lit, Conjunction, FieldPathOperations}; @@ -80,13 +79,11 @@ mod test { } fn to_int_indices(filtered_primitive: BoolArray) -> Vec { - let filtered = filtered_primitive + filtered_primitive .boolean_buffer() - .iter() - .enumerate() - .flat_map(|(idx, v)| if v { Some(idx as u64) } else { None }) - .collect_vec(); - filtered + .set_indices() + .map(|i| i as u64) + .collect() } #[test] diff --git a/vortex-array/src/array/primitive/compute/mod.rs b/vortex-array/src/array/primitive/compute/mod.rs index 637f63f366..e1ca9f3b07 100644 --- a/vortex-array/src/array/primitive/compute/mod.rs +++ b/vortex-array/src/array/primitive/compute/mod.rs @@ -1,5 +1,4 @@ use crate::array::primitive::PrimitiveArray; -use crate::compute::as_arrow::AsArrowArray; use crate::compute::cast::CastFn; use crate::compute::compare::CompareFn; use crate::compute::fill::FillForwardFn; @@ -11,7 +10,6 @@ use crate::compute::slice::SliceFn; use crate::compute::take::TakeFn; use crate::compute::ArrayCompute; -mod as_arrow; mod cast; mod compare; mod fill; @@ -23,10 +21,6 @@ mod subtract_scalar; mod take; impl ArrayCompute for PrimitiveArray { - fn as_arrow(&self) -> Option<&dyn AsArrowArray> { - Some(self) - } - fn cast(&self) -> Option<&dyn CastFn> { Some(self) } diff --git a/vortex-array/src/array/struct/compute.rs b/vortex-array/src/array/struct/compute.rs index 7b46317b5b..d873c9a7a8 100644 --- a/vortex-array/src/array/struct/compute.rs +++ b/vortex-array/src/array/struct/compute.rs @@ -1,15 +1,8 @@ -use std::sync::Arc; - -use arrow_array::{ - Array as ArrowArray, ArrayRef as ArrowArrayRef, StructArray as ArrowStructArray, -}; -use arrow_schema::{Field, Fields}; use itertools::Itertools; use vortex_error::VortexResult; use vortex_scalar::Scalar; use crate::array::r#struct::StructArray; -use crate::compute::as_arrow::{as_arrow, AsArrowArray}; use crate::compute::scalar_at::{scalar_at, ScalarAtFn}; use crate::compute::slice::{slice, SliceFn}; use crate::compute::take::{take, TakeFn}; @@ -17,10 +10,6 @@ use crate::compute::ArrayCompute; use crate::{Array, ArrayDType, IntoArray}; impl ArrayCompute for StructArray { - fn as_arrow(&self) -> Option<&dyn AsArrowArray> { - Some(self) - } - fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { Some(self) } @@ -34,34 +23,6 @@ impl ArrayCompute for StructArray { } } -impl AsArrowArray for StructArray { - fn as_arrow(&self) -> VortexResult { - let field_arrays: Vec = - self.children().map(|f| as_arrow(&f)).try_collect()?; - - let arrow_fields: Fields = self - .names() - .iter() - .zip(field_arrays.iter()) - .zip(self.dtypes().iter()) - .map(|((name, arrow_field), vortex_field)| { - Field::new( - &**name, - arrow_field.data_type().clone(), - vortex_field.is_nullable(), - ) - }) - .map(Arc::new) - .collect(); - - Ok(Arc::new(ArrowStructArray::new( - arrow_fields, - field_arrays, - None, - ))) - } -} - impl ScalarAtFn for StructArray { fn scalar_at(&self, index: usize) -> VortexResult { Ok(Scalar::r#struct( diff --git a/vortex-array/src/array/struct/mod.rs b/vortex-array/src/array/struct/mod.rs index f879eef959..042ec63ef0 100644 --- a/vortex-array/src/array/struct/mod.rs +++ b/vortex-array/src/array/struct/mod.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; use vortex_dtype::{FieldNames, Nullability, StructDType}; -use vortex_error::vortex_bail; +use vortex_error::{vortex_bail, vortex_err}; use crate::stats::ArrayStatisticsCompute; use crate::validity::{ArrayValidity, LogicalValidity, Validity, ValidityMetadata}; @@ -98,6 +98,36 @@ impl StructArray { } } +impl StructArray { + /// Return a new StructArray with the given projection applied. + /// + /// Projection does not copy data arrays. Projection is defined by an ordinal array slice + /// which specifies the new ordering of columns in the struct. The projection can be used to + /// perform column re-ordering, deletion, or duplication at a logical level, without any data + /// copying. + /// + /// This function will return an error if the projection includes invalid column IDs. + pub fn project(self, projection: &[usize]) -> VortexResult { + let mut children = Vec::with_capacity(projection.len()); + let mut names = Vec::with_capacity(projection.len()); + + for column_idx in projection { + children.push( + self.field(*column_idx) + .ok_or_else(|| vortex_err!(InvalidArgument: "column index out of bounds"))?, + ); + names.push(self.names()[*column_idx].clone()); + } + + StructArray::try_new( + FieldNames::from(names.as_slice()), + children, + self.len(), + self.validity(), + ) + } +} + impl ArrayFlatten for StructArray { /// StructEncoding is the canonical form for a [DType::Struct] array, so return self. fn flatten(self) -> VortexResult { @@ -134,3 +164,53 @@ impl AcceptArrayVisitor for StructArray { impl ArrayStatisticsCompute for StructArray {} impl EncodingCompression for StructEncoding {} + +#[cfg(test)] +mod test { + use vortex_dtype::{DType, FieldName, FieldNames, Nullability}; + + use crate::array::bool::BoolArray; + use crate::array::primitive::PrimitiveArray; + use crate::array::r#struct::StructArray; + use crate::array::varbin::VarBinArray; + use crate::validity::Validity; + use crate::{ArrayTrait, IntoArray}; + + #[test] + fn test_project() { + let xs = PrimitiveArray::from_vec(vec![0i64, 1, 2, 3, 4], Validity::NonNullable); + let ys = VarBinArray::from_vec( + vec!["a", "b", "c", "d", "e"], + DType::Utf8(Nullability::NonNullable), + ); + let zs = BoolArray::from_vec(vec![true, true, true, false, false], Validity::NonNullable); + + let struct_a = StructArray::try_new( + FieldNames::from(["xs".into(), "ys".into(), "zs".into()]), + vec![xs.into_array(), ys.into_array(), zs.into_array()], + 5, + Validity::NonNullable, + ) + .unwrap(); + + let struct_b = struct_a.project(&[2usize, 0]).unwrap(); + assert_eq!( + struct_b.names().to_vec(), + vec![FieldName::from("zs"), FieldName::from("xs")], + ); + + assert_eq!(struct_b.len(), 5); + + let bools = BoolArray::try_from(struct_b.field(0).unwrap()).unwrap(); + assert_eq!( + bools.boolean_buffer().iter().collect::>(), + vec![true, true, true, false, false] + ); + + let prims = PrimitiveArray::try_from(struct_b.field(1).unwrap()).unwrap(); + assert_eq!( + prims.scalar_buffer::().to_vec(), + vec![0i64, 1, 2, 3, 4] + ); + } +} diff --git a/vortex-array/src/array/varbin/compute/mod.rs b/vortex-array/src/array/varbin/compute/mod.rs index 7d3c31401b..732271728e 100644 --- a/vortex-array/src/array/varbin/compute/mod.rs +++ b/vortex-array/src/array/varbin/compute/mod.rs @@ -1,32 +1,18 @@ -use std::sync::Arc; - -use arrow_array::{ - ArrayRef as ArrowArrayRef, BinaryArray, LargeBinaryArray, LargeStringArray, StringArray, -}; -use vortex_dtype::DType; -use vortex_dtype::PType; -use vortex_error::{vortex_bail, VortexResult}; +use vortex_error::VortexResult; use vortex_scalar::Scalar; use crate::array::varbin::{varbin_scalar, VarBinArray}; -use crate::arrow::wrappers::as_offset_buffer; -use crate::compute::as_arrow::AsArrowArray; -use crate::compute::cast::cast; use crate::compute::scalar_at::ScalarAtFn; use crate::compute::slice::SliceFn; use crate::compute::take::TakeFn; use crate::compute::ArrayCompute; use crate::validity::ArrayValidity; -use crate::{ArrayDType, ToArray}; +use crate::ArrayDType; mod slice; mod take; impl ArrayCompute for VarBinArray { - fn as_arrow(&self) -> Option<&dyn AsArrowArray> { - Some(self) - } - fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { Some(self) } @@ -40,56 +26,6 @@ impl ArrayCompute for VarBinArray { } } -impl AsArrowArray for VarBinArray { - fn as_arrow(&self) -> VortexResult { - // Ensure the offsets are either i32 or i64 - let offsets = self.offsets().flatten_primitive()?; - let offsets = match offsets.ptype() { - PType::I32 | PType::I64 => offsets, - // Unless it's u64, everything else can be converted into an i32. - // FIXME(ngates): do not copy offsets again - PType::U64 => cast(&offsets.to_array(), PType::I64.into())?.flatten_primitive()?, - _ => cast(&offsets.to_array(), PType::I32.into())?.flatten_primitive()?, - }; - let nulls = self.logical_validity().to_null_buffer()?; - - let data = self.bytes().flatten_primitive()?; - assert_eq!(data.ptype(), PType::U8); - let data = data.buffer(); - - // Switch on Arrow DType. - Ok(match self.dtype() { - DType::Binary(_) => match offsets.ptype() { - PType::I32 => Arc::new(BinaryArray::new( - as_offset_buffer::(offsets), - data.into(), - nulls, - )), - PType::I64 => Arc::new(LargeBinaryArray::new( - as_offset_buffer::(offsets), - data.into(), - nulls, - )), - _ => panic!("Invalid offsets type"), - }, - DType::Utf8(_) => match offsets.ptype() { - PType::I32 => Arc::new(StringArray::new( - as_offset_buffer::(offsets), - data.into(), - nulls, - )), - PType::I64 => Arc::new(LargeStringArray::new( - as_offset_buffer::(offsets), - data.into(), - nulls, - )), - _ => panic!("Invalid offsets type"), - }, - _ => vortex_bail!(MismatchedTypes: "utf8 or binary", self.dtype()), - }) - } -} - impl ScalarAtFn for VarBinArray { fn scalar_at(&self, index: usize) -> VortexResult { if self.is_valid(index) { diff --git a/vortex-array/src/array/varbinview/compute.rs b/vortex-array/src/array/varbinview/compute.rs index c482461f2d..c5ecba5787 100644 --- a/vortex-array/src/array/varbinview/compute.rs +++ b/vortex-array/src/array/varbinview/compute.rs @@ -1,17 +1,8 @@ -use std::sync::Arc; - -use arrow_array::{ArrayRef as ArrowArrayRef, BinaryViewArray, StringViewArray}; -use arrow_buffer::Buffer as ArrowBuffer; -use arrow_buffer::ScalarBuffer; -use itertools::Itertools; -use vortex_dtype::DType; -use vortex_dtype::PType; -use vortex_error::{vortex_bail, VortexResult}; +use vortex_error::VortexResult; use vortex_scalar::Scalar; use crate::array::varbin::varbin_scalar; use crate::array::varbinview::{VarBinViewArray, VIEW_SIZE}; -use crate::compute::as_arrow::AsArrowArray; use crate::compute::scalar_at::ScalarAtFn; use crate::compute::slice::{slice, SliceFn}; use crate::compute::ArrayCompute; @@ -19,10 +10,6 @@ use crate::validity::ArrayValidity; use crate::{Array, ArrayDType, IntoArray, IntoArrayData}; impl ArrayCompute for VarBinViewArray { - fn as_arrow(&self) -> Option<&dyn AsArrowArray> { - Some(self) - } - fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { Some(self) } @@ -43,43 +30,6 @@ impl ScalarAtFn for VarBinViewArray { } } -impl AsArrowArray for VarBinViewArray { - fn as_arrow(&self) -> VortexResult { - // Views should be buffer of u8 - let views = self.views().flatten_primitive()?; - assert_eq!(views.ptype(), PType::U8); - let nulls = self.logical_validity().to_null_buffer()?; - - let data = (0..self.metadata().n_children) - .map(|i| self.bytes(i).flatten_primitive()) - .collect::>>()?; - if !data.is_empty() { - assert_eq!(data[0].ptype(), PType::U8); - assert!(data.iter().map(|d| d.ptype()).all_equal()); - } - - let data = data - .iter() - .map(|p| ArrowBuffer::from(p.buffer())) - .collect::>(); - - // Switch on Arrow DType. - Ok(match self.dtype() { - DType::Binary(_) => Arc::new(BinaryViewArray::new( - ScalarBuffer::::from(ArrowBuffer::from(views.buffer())), - data, - nulls, - )), - DType::Utf8(_) => Arc::new(StringViewArray::new( - ScalarBuffer::::from(ArrowBuffer::from(views.buffer())), - data, - nulls, - )), - _ => vortex_bail!(MismatchedTypes: "utf8 or binary", self.dtype()), - }) - } -} - impl SliceFn for VarBinViewArray { fn slice(&self, start: usize, stop: usize) -> VortexResult { Ok(Self::try_new( diff --git a/vortex-array/src/array/varbinview/mod.rs b/vortex-array/src/array/varbinview/mod.rs index 7fe0f3c6eb..7c30513cb0 100644 --- a/vortex-array/src/array/varbinview/mod.rs +++ b/vortex-array/src/array/varbinview/mod.rs @@ -1,17 +1,24 @@ use std::fmt::Formatter; +use std::ops::Deref; use std::{mem, slice}; use ::serde::{Deserialize, Serialize}; -use vortex_dtype::Nullability; +use arrow_array::{ArrayRef, BinaryViewArray, StringViewArray}; +use arrow_buffer::{Buffer, ScalarBuffer}; +use arrow_schema::DataType; +use itertools::Itertools; +use vortex_dtype::{Nullability, PType}; use vortex_error::vortex_bail; use crate::array::primitive::PrimitiveArray; +use crate::array::varbin::VarBinArray; use crate::array::varbinview::builder::VarBinViewBuilder; +use crate::arrow::FromArrowArray; use crate::compute::slice::slice; use crate::validity::Validity; use crate::validity::{ArrayValidity, LogicalValidity, ValidityMetadata}; use crate::visitor::{AcceptArrayVisitor, ArrayVisitor}; -use crate::{impl_encoding, ArrayDType, ArrayFlatten}; +use crate::{impl_encoding, ArrayDType, ArrayData, ArrayFlatten}; mod accessor; mod builder; @@ -219,7 +226,55 @@ impl VarBinViewArray { impl ArrayFlatten for VarBinViewArray { fn flatten(self) -> VortexResult { - Ok(Flattened::VarBinView(self)) + let nullable = self.dtype().is_nullable(); + let arrow_self = as_arrow(self); + let arrow_varbin = arrow_cast::cast(arrow_self.deref(), &DataType::Utf8) + .expect("Utf8View must cast to Ut8f"); + let vortex_array = ArrayData::from_arrow(arrow_varbin, nullable).into_array(); + + Ok(Flattened::VarBin(VarBinArray::try_from(&vortex_array)?)) + } +} + +fn as_arrow(var_bin_view: VarBinViewArray) -> ArrayRef { + // Views should be buffer of u8 + let views = var_bin_view + .views() + .flatten_primitive() + .expect("views must be primitive"); + assert_eq!(views.ptype(), PType::U8); + let nulls = var_bin_view + .logical_validity() + .to_null_buffer() + .expect("null buffer"); + + let data = (0..var_bin_view.metadata().n_children) + .map(|i| var_bin_view.bytes(i).flatten_primitive()) + .collect::>>() + .expect("bytes arrays must be primitive"); + if !data.is_empty() { + assert_eq!(data[0].ptype(), PType::U8); + assert!(data.iter().map(|d| d.ptype()).all_equal()); + } + + let data = data + .iter() + .map(|p| Buffer::from(p.buffer())) + .collect::>(); + + // Switch on Arrow DType. + match var_bin_view.dtype() { + DType::Binary(_) => Arc::new(BinaryViewArray::new( + ScalarBuffer::::from(Buffer::from(views.buffer())), + data, + nulls, + )), + DType::Utf8(_) => Arc::new(StringViewArray::new( + ScalarBuffer::::from(Buffer::from(views.buffer())), + data, + nulls, + )), + _ => panic!("expected utf8 or binary, got {}", var_bin_view.dtype()), } } @@ -301,14 +356,12 @@ impl EncodingCompression for VarBinViewEncoding {} #[cfg(test)] mod test { - use arrow_array::array::StringViewArray as ArrowStringViewArray; use vortex_scalar::Scalar; use crate::array::varbinview::VarBinViewArray; - use crate::compute::as_arrow::as_arrow; use crate::compute::scalar_at::scalar_at; use crate::compute::slice::slice; - use crate::{ArrayTrait, IntoArray}; + use crate::{ArrayFlatten, ArrayTrait, Flattened, IntoArray}; #[test] pub fn varbin_view() { @@ -341,20 +394,14 @@ mod test { } #[test] - pub fn iter() { - let binary_array = - VarBinViewArray::from(vec!["hello world", "hello world this is a long string"]); - assert_eq!( - as_arrow(binary_array.array()) - .unwrap() - .as_any() - .downcast_ref::() - .unwrap() - .iter() - .collect::>(), - ArrowStringViewArray::from(vec!["hello world", "hello world this is a long string",]) - .iter() - .collect::>() - ); + pub fn flatten_array() { + let binary_arr = VarBinViewArray::from(vec!["string1", "string2"]); + + let flattened = binary_arr.flatten().unwrap(); + assert!(matches!(flattened, Flattened::VarBin(_))); + + let var_bin = flattened.into_array(); + assert_eq!(scalar_at(&var_bin, 0).unwrap(), Scalar::from("string1")); + assert_eq!(scalar_at(&var_bin, 1).unwrap(), Scalar::from("string2")); } } diff --git a/vortex-array/src/compute/as_arrow.rs b/vortex-array/src/compute/as_arrow.rs deleted file mode 100644 index b6d4c65928..0000000000 --- a/vortex-array/src/compute/as_arrow.rs +++ /dev/null @@ -1,36 +0,0 @@ -use arrow_array::ArrayRef as ArrowArrayRef; -use vortex_error::{vortex_err, VortexResult}; - -use crate::array::chunked::ChunkedArray; -use crate::{Array, IntoArray}; - -pub trait AsArrowArray { - fn as_arrow(&self) -> VortexResult; -} - -pub fn as_arrow(array: &Array) -> VortexResult { - array.with_dyn(|a| { - // If as_arrow is implemented, then invoke that. - if let Some(a) = a.as_arrow() { - return a.as_arrow(); - } - - // Otherwise, flatten and try again. - let array = array.clone().flatten()?.into_array(); - a.as_arrow() - .map(|a| a.as_arrow()) - .unwrap_or_else(|| Err(vortex_err!(NotImplemented: "as_arrow", array.encoding().id()))) - }) -} - -// TODO(ngates): return a RecordBatchReader instead? -pub fn as_arrow_chunks(array: &Array) -> VortexResult> { - if let Ok(chunked) = ChunkedArray::try_from(array) { - chunked - .chunks() - .map(|a| as_arrow(&a)) - .collect::>>() - } else { - as_arrow(array).map(|a| vec![a]) - } -} diff --git a/vortex-array/src/compute/mod.rs b/vortex-array/src/compute/mod.rs index 9f2f18df63..56fd00489b 100644 --- a/vortex-array/src/compute/mod.rs +++ b/vortex-array/src/compute/mod.rs @@ -1,4 +1,3 @@ -use as_arrow::AsArrowArray; use cast::CastFn; use compare::CompareFn; use fill::FillForwardFn; @@ -11,7 +10,6 @@ use take::TakeFn; use crate::compute::filter_indices::FilterIndicesFn; use crate::compute::scalar_subtract::SubtractScalarFn; -pub mod as_arrow; pub mod cast; pub mod compare; pub mod fill; @@ -24,10 +22,6 @@ pub mod slice; pub mod take; pub trait ArrayCompute { - fn as_arrow(&self) -> Option<&dyn AsArrowArray> { - None - } - fn cast(&self) -> Option<&dyn CastFn> { None } diff --git a/vortex-array/src/flatten.rs b/vortex-array/src/flatten.rs index b6d5ef7b1d..86cf907227 100644 --- a/vortex-array/src/flatten.rs +++ b/vortex-array/src/flatten.rs @@ -1,14 +1,32 @@ +use std::sync::Arc; + +use arrow_array::types::{ + Float16Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, UInt16Type, + UInt32Type, UInt64Type, UInt8Type, +}; +use arrow_array::{ + ArrayRef, ArrowPrimitiveType, BinaryArray, BooleanArray as ArrowBoolArray, LargeBinaryArray, + LargeStringArray, NullArray as ArrowNullArray, PrimitiveArray as ArrowPrimitiveArray, + StringArray, StructArray as ArrowStructArray, TimestampMicrosecondArray, + TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, +}; +use arrow_buffer::ScalarBuffer; +use arrow_schema::{Field, Fields}; +use vortex_dtype::{DType, PType}; use vortex_error::VortexResult; use crate::array::bool::BoolArray; +use crate::array::datetime::{LocalDateTimeArray, TimeUnit}; use crate::array::extension::ExtensionArray; use crate::array::null::NullArray; use crate::array::primitive::PrimitiveArray; use crate::array::r#struct::StructArray; use crate::array::varbin::VarBinArray; -use crate::array::varbinview::VarBinViewArray; +use crate::arrow::wrappers::as_offset_buffer; +use crate::compute::cast::cast; use crate::encoding::ArrayEncoding; -use crate::{Array, IntoArray}; +use crate::validity::ArrayValidity; +use crate::{Array, ArrayDType, ArrayTrait, IntoArray, ToArray}; /// The set of encodings that can be converted to Arrow with zero-copy. pub enum Flattened { @@ -17,10 +35,189 @@ pub enum Flattened { Primitive(PrimitiveArray), Struct(StructArray), VarBin(VarBinArray), - VarBinView(VarBinViewArray), + // TODO(aduffy): VarBinView is being disabled until execution engines improve their + // support for them, or we build better execution kernels of our own. + // Once DataFusion completes https://github.com/apache/datafusion/issues/10918, we should + // flip this to be the preferred "flat" encoding for all string and binary types. + // VarBinView(VarBinViewArray), Extension(ExtensionArray), } +impl Flattened { + /// Convert a flat array into its equivalent [ArrayRef](Arrow array). + /// + /// Scalar arrays such as Bool and Primitive flattened arrays though should convert with + /// zero copies, while more complex variants such as Struct may require allocations if its child + /// arrays require decompression. + pub fn into_arrow(self) -> ArrayRef { + match self { + Flattened::Null(a) => null_to_arrow(a), + Flattened::Bool(a) => bool_to_arrow(a), + Flattened::Primitive(a) => primitive_to_arrow(a), + Flattened::Struct(a) => struct_to_arrow(a), + Flattened::VarBin(a) => varbin_to_arrow(a), + Flattened::Extension(a) => match a.id().as_ref() { + "vortex.localdatetime" => local_date_time_to_arrow( + LocalDateTimeArray::try_from(&a.into_array()).expect("localdatetime"), + ), + _ => panic!("unsupported extension dtype with ID {}", a.id().as_ref()), + }, + } + } +} + +fn null_to_arrow(null_array: NullArray) -> ArrayRef { + Arc::new(ArrowNullArray::new(null_array.len())) +} + +fn bool_to_arrow(bool_array: BoolArray) -> ArrayRef { + Arc::new(ArrowBoolArray::new( + bool_array.boolean_buffer(), + bool_array + .logical_validity() + .to_null_buffer() + .expect("null buffer"), + )) +} + +fn primitive_to_arrow(primitive_array: PrimitiveArray) -> ArrayRef { + fn as_arrow_array_primitive( + array: &PrimitiveArray, + ) -> ArrowPrimitiveArray { + ArrowPrimitiveArray::new( + ScalarBuffer::::new(array.buffer().clone().into(), 0, array.len()), + array + .logical_validity() + .to_null_buffer() + .expect("null buffer"), + ) + } + + match primitive_array.ptype() { + PType::U8 => Arc::new(as_arrow_array_primitive::(&primitive_array)), + PType::U16 => Arc::new(as_arrow_array_primitive::(&primitive_array)), + PType::U32 => Arc::new(as_arrow_array_primitive::(&primitive_array)), + PType::U64 => Arc::new(as_arrow_array_primitive::(&primitive_array)), + PType::I8 => Arc::new(as_arrow_array_primitive::(&primitive_array)), + PType::I16 => Arc::new(as_arrow_array_primitive::(&primitive_array)), + PType::I32 => Arc::new(as_arrow_array_primitive::(&primitive_array)), + PType::I64 => Arc::new(as_arrow_array_primitive::(&primitive_array)), + PType::F16 => Arc::new(as_arrow_array_primitive::(&primitive_array)), + PType::F32 => Arc::new(as_arrow_array_primitive::(&primitive_array)), + PType::F64 => Arc::new(as_arrow_array_primitive::(&primitive_array)), + } +} + +fn struct_to_arrow(struct_array: StructArray) -> ArrayRef { + let field_arrays: Vec = struct_array + .children() + .map(|f| f.flatten().unwrap().into_arrow()) + .collect(); + + let arrow_fields: Fields = struct_array + .names() + .iter() + .zip(field_arrays.iter()) + .zip(struct_array.dtypes().iter()) + .map(|((name, arrow_field), vortex_field)| { + Field::new( + &**name, + arrow_field.data_type().clone(), + vortex_field.is_nullable(), + ) + }) + .map(Arc::new) + .collect(); + + Arc::new(ArrowStructArray::new(arrow_fields, field_arrays, None)) +} + +fn varbin_to_arrow(varbin_array: VarBinArray) -> ArrayRef { + let offsets = varbin_array + .offsets() + .flatten_primitive() + .expect("flatten_primitive"); + let offsets = match offsets.ptype() { + PType::I32 | PType::I64 => offsets, + // Unless it's u64, everything else can be converted into an i32. + // FIXME(ngates): do not copy offsets again + PType::U64 => cast(&offsets.to_array(), PType::I64.into()) + .expect("cast to i64") + .flatten_primitive() + .expect("flatten_primitive"), + _ => cast(&offsets.to_array(), PType::I32.into()) + .expect("cast to i32") + .flatten_primitive() + .expect("flatten_primitive"), + }; + let nulls = varbin_array + .logical_validity() + .to_null_buffer() + .expect("null buffer"); + + let data = varbin_array + .bytes() + .flatten_primitive() + .expect("flatten_primitive"); + assert_eq!(data.ptype(), PType::U8); + let data = data.buffer(); + + // Switch on Arrow DType. + match varbin_array.dtype() { + DType::Binary(_) => match offsets.ptype() { + PType::I32 => Arc::new(BinaryArray::new( + as_offset_buffer::(offsets), + data.into(), + nulls, + )), + PType::I64 => Arc::new(LargeBinaryArray::new( + as_offset_buffer::(offsets), + data.into(), + nulls, + )), + _ => panic!("Invalid offsets type"), + }, + DType::Utf8(_) => match offsets.ptype() { + PType::I32 => Arc::new(StringArray::new( + as_offset_buffer::(offsets), + data.into(), + nulls, + )), + PType::I64 => Arc::new(LargeStringArray::new( + as_offset_buffer::(offsets), + data.into(), + nulls, + )), + _ => panic!("Invalid offsets type"), + }, + _ => panic!( + "expected utf8 or binary instead of {}", + varbin_array.dtype() + ), + } +} + +fn local_date_time_to_arrow(local_date_time_array: LocalDateTimeArray) -> ArrayRef { + // A LocalDateTime maps to an Arrow Timestamp array with no timezone. + let timestamps = cast(&local_date_time_array.timestamps(), PType::I64.into()) + .expect("timestamps must cast to i64") + .flatten_primitive() + .expect("must be i64 array"); + let validity = timestamps + .logical_validity() + .to_null_buffer() + .expect("null buffer"); + let timestamps_len = timestamps.len(); + let buffer = ScalarBuffer::::new(timestamps.into_buffer().into(), 0, timestamps_len); + + match local_date_time_array.time_unit() { + TimeUnit::Ns => Arc::new(TimestampNanosecondArray::new(buffer, validity)), + TimeUnit::Us => Arc::new(TimestampMicrosecondArray::new(buffer, validity)), + TimeUnit::Ms => Arc::new(TimestampMillisecondArray::new(buffer, validity)), + TimeUnit::S => Arc::new(TimestampSecondArray::new(buffer, validity)), + } +} + /// Support trait for transmuting an array into its [vortex_dtype::DType]'s canonical encoding. /// /// Flattening an Array ensures that the array's encoding matches one of the builtin canonical @@ -62,7 +259,6 @@ impl IntoArray for Flattened { Self::Struct(a) => a.into_array(), Self::VarBin(a) => a.into_array(), Self::Extension(a) => a.into_array(), - Self::VarBinView(a) => a.into_array(), } } } diff --git a/vortex-array/src/view.rs b/vortex-array/src/view.rs index 11e63778df..ffd971db52 100644 --- a/vortex-array/src/view.rs +++ b/vortex-array/src/view.rs @@ -35,7 +35,6 @@ impl Debug for ArrayView { f.debug_struct("ArrayView") .field("encoding", &self.encoding) .field("dtype", &self.dtype) - // .field("array", &self.array) .field("buffers", &self.buffers) .field("ctx", &self.ctx) .finish() diff --git a/vortex-buffer/src/string.rs b/vortex-buffer/src/string.rs index 520d3102f4..c2dfe0de13 100644 --- a/vortex-buffer/src/string.rs +++ b/vortex-buffer/src/string.rs @@ -3,12 +3,12 @@ use std::str::Utf8Error; use crate::Buffer; -/// A wrapper around a `Buffer` that guarantees that the buffer contains valid UTF-8. +/// A wrapper around a [`Buffer`] that guarantees that the buffer contains valid UTF-8. #[derive(Debug, Clone, PartialEq, Eq, PartialOrd)] pub struct BufferString(Buffer); impl BufferString { - /// Creates a new `BufferString` from a `Buffer`. + /// Creates a new `BufferString` from a [`Buffer`]. /// /// # Safety /// Assumes that the buffer contains valid UTF-8. diff --git a/vortex-datafusion/Cargo.toml b/vortex-datafusion/Cargo.toml new file mode 100644 index 0000000000..72c4caa5b0 --- /dev/null +++ b/vortex-datafusion/Cargo.toml @@ -0,0 +1,35 @@ +[package] +name = "vortex-datafusion" +version.workspace = true +homepage.workspace = true +repository.workspace = true +authors.workspace = true +license.workspace = true +keywords.workspace = true +include.workspace = true +edition.workspace = true +rust-version.workspace = true + +[dependencies] +vortex-array = { path = "../vortex-array" } +vortex-dtype = { path = "../vortex-dtype" } +vortex-error = { path = "../vortex-error" } + +arrow-array = { workspace = true } +arrow-schema = { workspace = true } +async-trait = { workspace = true } + +datafusion = { workspace = true } +datafusion-common = { workspace = true } +datafusion-expr = { workspace = true } +datafusion-execution = { workspace = true } +datafusion-physical-expr = { workspace = true } +datafusion-physical-plan = { workspace = true } +futures = { workspace = true } +pin-project = { workspace = true } + +[dev-dependencies] +tokio = { workspace = true, features = ["test-util"] } + +[lints] +workspace = true diff --git a/vortex-datafusion/src/datatype.rs b/vortex-datafusion/src/datatype.rs new file mode 100644 index 0000000000..e53fb27f89 --- /dev/null +++ b/vortex-datafusion/src/datatype.rs @@ -0,0 +1,198 @@ +//! Convert between Vortex [vortex_dtype::DType] and Apache Arrow [arrow_schema::DataType]. +//! +//! Apache Arrow's type system includes physical information, which could lead to ambiguities as +//! Vortex treats encodings as separate from logical types. +//! +//! [`infer_schema`] and its sibling [`infer_data_type`] use a simple algorithm, where every +//! logical type is encoded in its simplest corresponding Arrow type. This reflects the reality that +//! most compute engines don't make use of the entire type range arrow-rs supports. +//! +//! For this reason, it's recommended to do as much computation as possible within Vortex, and then +//! materialize an Arrow ArrayRef at the very end of the processing chain. + +use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, SchemaBuilder}; +use vortex_dtype::{DType, Nullability, PType}; + +pub(crate) fn infer_schema(dtype: &DType) -> Schema { + let DType::Struct(struct_dtype, nullable) = dtype else { + panic!("only DType::Struct can be converted to arrow schema"); + }; + + if *nullable != Nullability::NonNullable { + panic!("top-level struct in Schema must be NonNullable"); + } + + let mut builder = SchemaBuilder::with_capacity(struct_dtype.names().len()); + for (field_name, field_dtype) in struct_dtype + .names() + .iter() + .zip(struct_dtype.dtypes().iter()) + { + builder.push(FieldRef::from(Field::new( + field_name.to_string(), + infer_data_type(field_dtype), + field_dtype.is_nullable(), + ))); + } + + builder.finish() +} + +pub(crate) fn infer_data_type(dtype: &DType) -> DataType { + match dtype { + DType::Null => DataType::Null, + DType::Bool(_) => DataType::Boolean, + DType::Primitive(ptype, _) => match ptype { + PType::U8 => DataType::UInt8, + PType::U16 => DataType::UInt16, + PType::U32 => DataType::UInt32, + PType::U64 => DataType::UInt64, + PType::I8 => DataType::Int8, + PType::I16 => DataType::Int16, + PType::I32 => DataType::Int32, + PType::I64 => DataType::Int64, + PType::F16 => DataType::Float16, + PType::F32 => DataType::Float32, + PType::F64 => DataType::Float64, + }, + DType::Utf8(_) => DataType::Utf8, + DType::Binary(_) => DataType::Binary, + DType::Struct(struct_dtype, _) => { + let mut fields = Vec::with_capacity(struct_dtype.names().len()); + for (field_name, field_dt) in struct_dtype + .names() + .iter() + .zip(struct_dtype.dtypes().iter()) + { + fields.push(FieldRef::from(Field::new( + field_name.to_string(), + infer_data_type(field_dt), + field_dt.is_nullable(), + ))); + } + + DataType::Struct(Fields::from(fields)) + } + DType::List(list_dt, _) => { + let dtype: &DType = list_dt; + DataType::List(FieldRef::from(Field::new( + "element", + infer_data_type(dtype), + dtype.is_nullable(), + ))) + } + DType::Extension(..) => { + panic!("Extension DType conversion to Arrow not supported") + } + } +} + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use arrow_schema::{DataType, Field, FieldRef, Fields, Schema}; + use vortex_dtype::{ + DType, ExtDType, ExtID, FieldName, FieldNames, Nullability, PType, StructDType, + }; + + use crate::datatype::{infer_data_type, infer_schema}; + + #[test] + fn test_dtype_conversion_success() { + assert_eq!(infer_data_type(&DType::Null), DataType::Null); + + assert_eq!( + infer_data_type(&DType::Bool(Nullability::NonNullable)), + DataType::Boolean + ); + + assert_eq!( + infer_data_type(&DType::Primitive(PType::U64, Nullability::NonNullable)), + DataType::UInt64 + ); + + assert_eq!( + infer_data_type(&DType::Utf8(Nullability::NonNullable)), + DataType::Utf8 + ); + + assert_eq!( + infer_data_type(&DType::Binary(Nullability::NonNullable)), + DataType::Binary + ); + + assert_eq!( + infer_data_type(&DType::List( + Arc::new(DType::Bool(Nullability::NonNullable)), + Nullability::Nullable, + )), + DataType::List(FieldRef::from(Field::new( + "element".to_string(), + DataType::Boolean, + false, + ))) + ); + + assert_eq!( + infer_data_type(&DType::Struct( + StructDType::new( + FieldNames::from(vec![FieldName::from("field_a"), FieldName::from("field_b")]), + vec![DType::Bool(false.into()), DType::Utf8(true.into())], + ), + Nullability::NonNullable, + )), + DataType::Struct(Fields::from(vec![ + FieldRef::from(Field::new("field_a", DataType::Boolean, false)), + FieldRef::from(Field::new("field_b", DataType::Utf8, true)), + ])) + ); + } + + #[test] + #[should_panic] + fn test_dtype_conversion_panics() { + let _ = infer_data_type(&DType::Extension( + ExtDType::new(ExtID::from("my-fake-ext-dtype"), None), + Nullability::NonNullable, + )); + } + + #[test] + fn test_schema_conversion() { + let struct_dtype = the_struct(); + let schema_nonnull = DType::Struct(struct_dtype.clone(), Nullability::NonNullable); + + assert_eq!( + infer_schema(&schema_nonnull), + Schema::new(Fields::from(vec![ + Field::new("field_a", DataType::Boolean, false), + Field::new("field_b", DataType::Utf8, false), + Field::new("field_c", DataType::Int32, true), + ])) + ); + } + + #[test] + #[should_panic] + fn test_schema_conversion_panics() { + let struct_dtype = the_struct(); + let schema_null = DType::Struct(struct_dtype.clone(), Nullability::Nullable); + let _ = infer_schema(&schema_null); + } + + fn the_struct() -> StructDType { + StructDType::new( + FieldNames::from([ + FieldName::from("field_a"), + FieldName::from("field_b"), + FieldName::from("field_c"), + ]), + vec![ + DType::Bool(Nullability::NonNullable), + DType::Utf8(Nullability::NonNullable), + DType::Primitive(PType::I32, Nullability::Nullable), + ], + ) + } +} diff --git a/vortex-datafusion/src/lib.rs b/vortex-datafusion/src/lib.rs new file mode 100644 index 0000000000..6ce6c2af4f --- /dev/null +++ b/vortex-datafusion/src/lib.rs @@ -0,0 +1,307 @@ +//! Connectors to enable DataFusion to read Vortex data. + +use std::any::Any; +use std::fmt::Formatter; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use arrow_array::{RecordBatch, StructArray as ArrowStructArray}; +use arrow_schema::SchemaRef; +use async_trait::async_trait; +use datafusion::datasource::TableProvider; +use datafusion::execution::context::SessionState; +use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; +use datafusion_common::{exec_datafusion_err, exec_err, DataFusionError, Result as DFResult}; +use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType}; +use datafusion_physical_expr::EquivalenceProperties; +use datafusion_physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, PlanProperties, +}; +use futures::{Stream, StreamExt}; +use pin_project::pin_project; +use vortex::array::chunked::ChunkedArray; +use vortex::array::r#struct::StructArray; +use vortex::{Array, ArrayDType, ArrayFlatten, IntoArray}; +use vortex_dtype::DType; +use vortex_error::{vortex_bail, VortexResult}; + +use crate::datatype::infer_schema; + +mod datatype; + +/// A [`TableProvider`] that exposes an existing Vortex Array to the DataFusion SQL engine. +/// +/// Only arrays that have a top-level [struct type](vortex_dtype::StructDType) can be exposed as +/// a table to DataFusion. +#[derive(Debug, Clone)] +pub(crate) struct VortexInMemoryTableProvider { + array: Array, + schema_ref: SchemaRef, +} + +impl VortexInMemoryTableProvider { + /// Build a new table provider from an existing [struct type](vortex_dtype::StructDType) array. + pub fn try_new(array: Array) -> VortexResult { + if !matches!(array.dtype(), DType::Struct(_, _)) { + vortex_bail!(InvalidArgument: "only DType::Struct arrays can produce a table provider"); + } + + let arrow_schema = infer_schema(array.dtype()); + let schema_ref = SchemaRef::new(arrow_schema); + + Ok(Self { array, schema_ref }) + } +} + +#[async_trait] +impl TableProvider for VortexInMemoryTableProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema_ref) + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + /// Plan an array scan. + /// + /// Currently, projection pushdown is supported, but not filter pushdown. + /// The array is flattened directly into the nearest Arrow-compatible encoding. + async fn scan( + &self, + _state: &SessionState, + projection: Option<&Vec>, + filters: &[Expr], + _limit: Option, + ) -> DFResult> { + if !filters.is_empty() { + return exec_err!("vortex does not support filter pushdown"); + } + + let partitioning = if let Ok(chunked_array) = ChunkedArray::try_from(&self.array) { + Partitioning::RoundRobinBatch(chunked_array.nchunks()) + } else { + Partitioning::UnknownPartitioning(1) + }; + + let plan_properties = PlanProperties::new( + EquivalenceProperties::new(self.schema_ref.clone()), + partitioning, + ExecutionMode::Bounded, + ); + + Ok(Arc::new(VortexMemoryExec { + array: self.array.clone(), + projection: projection.cloned(), + plan_properties, + })) + } + + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> DFResult> { + // TODO(aduffy): add support for filter pushdown + Ok(filters + .iter() + .map(|_| TableProviderFilterPushDown::Unsupported) + .collect()) + } +} + +/// Physical plan node for scans against an in-memory, possibly chunked Vortex Array. +#[derive(Debug, Clone)] +struct VortexMemoryExec { + array: Array, + projection: Option>, + plan_properties: PlanProperties, +} + +impl DisplayAs for VortexMemoryExec { + fn fmt_as(&self, _display_type: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + write!(f, "{:?}", self) + } +} + +impl VortexMemoryExec { + /// Read a single array chunk from the source as a RecordBatch. + /// + /// `array` must be a [`StructArray`] or flatten into one. Passing a different Array variant + /// may cause a panic. + fn execute_single_chunk( + array: Array, + projection: &Option>, + _context: Arc, + ) -> DFResult { + let data = array + .flatten() + .map_err(|vortex_error| DataFusionError::Execution(format!("{}", vortex_error)))? + .into_array(); + + // Construct the RecordBatch by flattening each struct field and transmuting to an ArrayRef. + let struct_array = StructArray::try_from(data).expect("array must be StructArray"); + + let field_order = if let Some(projection) = projection { + projection.clone() + } else { + (0..struct_array.names().len()).collect() + }; + + let projected_struct = + struct_array + .project(field_order.as_slice()) + .map_err(|vortex_err| { + exec_datafusion_err!("projection pushdown to Vortex failed: {vortex_err}") + })?; + let batch = RecordBatch::from( + projected_struct + .flatten() + .expect("struct arrays must flatten") + .into_arrow() + .as_any() + .downcast_ref::() + .expect("vortex StructArray must convert to arrow StructArray"), + ); + Ok(Box::pin(VortexRecordBatchStream { + schema_ref: batch.schema(), + inner: futures::stream::iter(vec![batch]), + })) + } +} + +#[pin_project] +struct VortexRecordBatchStream { + schema_ref: SchemaRef, + + #[pin] + inner: I, +} + +impl Stream for VortexRecordBatchStream +where + I: Stream, +{ + type Item = DFResult; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + match this.inner.poll_next_unpin(cx) { + Poll::Ready(Some(batch)) => Poll::Ready(Some(Ok(batch))), + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + } +} + +impl RecordBatchStream for VortexRecordBatchStream +where + I: Stream, +{ + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema_ref) + } +} + +impl ExecutionPlan for VortexMemoryExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.plan_properties + } + + fn children(&self) -> Vec<&Arc> { + // Leaf node + vec![] + } + + fn with_new_children( + self: Arc, + _: Vec>, + ) -> DFResult> { + Ok(self) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> DFResult { + let chunk = if let Ok(chunked_array) = ChunkedArray::try_from(&self.array) { + chunked_array + .chunk(partition) + .ok_or_else(|| exec_datafusion_err!("partition not found"))? + } else { + self.array.clone() + }; + + Self::execute_single_chunk(chunk, &self.projection, context) + } +} + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use datafusion::arrow::array::AsArray; + use datafusion::arrow::datatypes::UInt64Type; + use datafusion::prelude::SessionContext; + use vortex::array::primitive::PrimitiveArray; + use vortex::array::r#struct::StructArray; + use vortex::array::varbin::VarBinArray; + use vortex::validity::Validity; + use vortex::IntoArray; + use vortex_dtype::{DType, FieldName, Nullability}; + + use crate::VortexInMemoryTableProvider; + + #[tokio::test] + async fn test_datafusion_simple() { + let names = VarBinArray::from_vec( + vec!["Washington", "Adams", "Jefferson", "Madison", "Monroe"], + DType::Utf8(Nullability::NonNullable), + ); + let term_start = + PrimitiveArray::from_vec(vec![1789u16, 1797, 1801, 1809, 1817], Validity::NonNullable); + let presidents = StructArray::try_new( + Arc::new([FieldName::from("president"), FieldName::from("term_start")]), + vec![names.into_array(), term_start.into_array()], + 5, + Validity::NonNullable, + ) + .unwrap(); + + let presidents_table = + Arc::new(VortexInMemoryTableProvider::try_new(presidents.into_array()).unwrap()); + let session_ctx = SessionContext::new(); + + session_ctx + .register_table("presidents", presidents_table) + .unwrap(); + + let df_term_start = session_ctx + .sql("SELECT SUM(term_start) FROM presidents WHERE president <> 'Madison'") + .await + .unwrap() + .collect() + .await + .unwrap(); + + assert_eq!(df_term_start.len(), 1); + assert_eq!( + *df_term_start[0] + .column(0) + .as_primitive::() + .values() + .first() + .unwrap(), + vec![1789u64, 1797, 1801, 1817].into_iter().sum::() + ); + } +} diff --git a/vortex-dtype/src/dtype.rs b/vortex-dtype/src/dtype.rs index a308020fa1..c4e3822073 100644 --- a/vortex-dtype/src/dtype.rs +++ b/vortex-dtype/src/dtype.rs @@ -75,6 +75,10 @@ impl DType { pub fn eq_ignore_nullability(&self, other: &Self) -> bool { self.as_nullable().eq(&other.as_nullable()) } + + pub fn is_struct(&self) -> bool { + matches!(self, Struct(_, _)) + } } impl Display for DType { diff --git a/vortex-ipc/Cargo.toml b/vortex-ipc/Cargo.toml index ea08a4cb7b..f1d57ddded 100644 --- a/vortex-ipc/Cargo.toml +++ b/vortex-ipc/Cargo.toml @@ -18,7 +18,7 @@ futures-util = { workspace = true } itertools = { workspace = true } monoio = { workspace = true, optional = true, features = ["bytes"] } pin-project = { workspace = true } -tokio = { workspace = true, features = ["io-util"], optional = true } +tokio = { workspace = true, features = ["io-util", "fs"], optional = true } vortex-array = { path = "../vortex-array" } vortex-buffer = { path = "../vortex-buffer" } vortex-error = { path = "../vortex-error" } @@ -57,4 +57,4 @@ harness = false [[bench]] name = "ipc_array_reader_take" -harness = false \ No newline at end of file +harness = false