From 38881e4322b467c816a6c057bda7faf4e0b036f4 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Sun, 11 Sep 2022 16:25:42 +0800 Subject: [PATCH] feat(query): new input format framework. --- Cargo.lock | 5 + src/query/pipeline/sources/Cargo.toml | 5 + .../sources/input_formats/delimiter.rs | 50 +++ .../impls/input_format_parquet.rs | 138 ++++++++ .../input_formats/impls/input_format_tsv.rs | 116 +++++++ .../sources/input_formats/impls/mod.rs | 16 + .../sources/input_formats/input_context.rs | 188 +++++++++++ .../sources/input_formats/input_format.rs | 123 +++++++ .../input_formats/input_format_text.rs | 307 ++++++++++++++++++ .../sources/input_formats/input_pipeline.rs | 251 ++++++++++++++ .../processors/sources/input_formats/mod.rs | 24 ++ .../input_formats/processor_aligner.rs | 150 +++++++++ .../input_formats/processor_deserializer.rs | 121 +++++++ .../sources/src/processors/sources/mod.rs | 1 + .../src/storages/stage/stage_source.rs | 1 + 15 files changed, 1496 insertions(+) create mode 100644 src/query/pipeline/sources/src/processors/sources/input_formats/delimiter.rs create mode 100644 src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_parquet.rs create mode 100644 src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_tsv.rs create mode 100644 src/query/pipeline/sources/src/processors/sources/input_formats/impls/mod.rs create mode 100644 src/query/pipeline/sources/src/processors/sources/input_formats/input_context.rs create mode 100644 src/query/pipeline/sources/src/processors/sources/input_formats/input_format.rs create mode 100644 src/query/pipeline/sources/src/processors/sources/input_formats/input_format_text.rs create mode 100644 src/query/pipeline/sources/src/processors/sources/input_formats/input_pipeline.rs create mode 100644 src/query/pipeline/sources/src/processors/sources/input_formats/mod.rs create mode 100644 src/query/pipeline/sources/src/processors/sources/input_formats/processor_aligner.rs create mode 100644 src/query/pipeline/sources/src/processors/sources/input_formats/processor_deserializer.rs diff --git a/Cargo.lock b/Cargo.lock index 429fb758e1af..6cc3f08d223e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1672,21 +1672,26 @@ dependencies = [ name = "common-pipeline-sources" version = "0.1.0" dependencies = [ + "async-channel", "async-trait-fn", + "common-arrow", "common-base", "common-catalog", "common-datablocks", + "common-datavalues", "common-exception", "common-formats", "common-io", "common-meta-types", "common-pipeline-core", + "common-settings", "common-storage", "common-streams", "futures", "futures-util", "opendal", "parking_lot 0.12.1", + "tracing", ] [[package]] diff --git a/src/query/pipeline/sources/Cargo.toml b/src/query/pipeline/sources/Cargo.toml index ea2485c080db..17372c50b70a 100644 --- a/src/query/pipeline/sources/Cargo.toml +++ b/src/query/pipeline/sources/Cargo.toml @@ -9,17 +9,22 @@ doctest = false test = false [dependencies] +async-channel = "1.7.1" +common-arrow = { path = "../../../common/arrow" } common-base = { path = "../../../common/base" } common-catalog = { path = "../../catalog" } common-datablocks = { path = "../../datablocks" } +common-datavalues = { path = "../../datavalues" } common-exception = { path = "../../../common/exception" } common-formats = { path = "../../formats" } common-io = { path = "../../../common/io" } common-meta-types = { path = "../../../meta/types" } common-pipeline-core = { path = "../core" } +common-settings = { path = "../../settings" } common-storage = { path = "../../../common/storage" } common-streams = { path = "../../streams" } +tracing = "0.1.35" async-trait = { version = "0.1.0", package = "async-trait-fn" } futures = "0.3.21" futures-util = "0.3.21" diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/delimiter.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/delimiter.rs new file mode 100644 index 000000000000..216a55da7898 --- /dev/null +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/delimiter.rs @@ -0,0 +1,50 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_exception::ErrorCode; +use common_exception::Result; + +pub enum RecordDelimiter { + CRLF, + Any(u8), +} + +impl RecordDelimiter { + pub fn end(&self) -> u8 { + match self { + RecordDelimiter::CRLF => b'\n', + RecordDelimiter::Any(b) => *b, + } + } +} + +impl TryFrom<&str> for RecordDelimiter { + type Error = ErrorCode; + fn try_from(s: &str) -> Result { + match s.len() { + 1 => Ok(RecordDelimiter::Any(s.as_bytes()[0])), + 2 if s.eq("\r\n") => Ok(RecordDelimiter::CRLF), + _ => Err(ErrorCode::InvalidArgument(format!( + "bad RecordDelimiter: '{}'", + s + ))), + } + } +} + +impl Default for RecordDelimiter { + fn default() -> Self { + RecordDelimiter::CRLF + } +} diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_parquet.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_parquet.rs new file mode 100644 index 000000000000..fa3669cda8d9 --- /dev/null +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_parquet.rs @@ -0,0 +1,138 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Debug; +use std::fmt::Formatter; +use std::sync::Arc; + +use common_arrow::parquet::metadata::RowGroupMetaData; +use common_base::base::tokio::sync::mpsc::Receiver; +use common_datablocks::DataBlock; +use common_exception::Result; +use common_pipeline_core::Pipeline; +use opendal::Object; + +use crate::processors::sources::input_formats::input_context::InputContext; +use crate::processors::sources::input_formats::input_format::FileInfo; +use crate::processors::sources::input_formats::input_format::InputData; +use crate::processors::sources::input_formats::input_format::SplitInfo; +use crate::processors::sources::input_formats::input_pipeline::AligningStateTrait; +use crate::processors::sources::input_formats::input_pipeline::BlockBuilderTrait; +use crate::processors::sources::input_formats::input_pipeline::InputFormatPipe; +use crate::processors::sources::input_formats::input_pipeline::StreamingReadBatch; +use crate::processors::sources::input_formats::InputFormat; + +struct InputFormatParquet; + +#[async_trait::async_trait] +impl InputFormat for InputFormatParquet { + async fn read_file_meta( + &self, + obj: &Object, + size: usize, + ) -> Result>> { + todo!() + } + + async fn read_split_meta( + &self, + obj: &Object, + split_info: &SplitInfo, + ) -> Result>> { + todo!() + } + + fn split_files(&self, file_infos: Vec, split_size: usize) -> Vec { + todo!() + } + + fn exec_copy(&self, ctx: Arc, pipeline: &mut Pipeline) -> Result<()> { + todo!() + } + + fn exec_stream( + &self, + ctx: Arc, + pipeline: &mut Pipeline, + input: Receiver, + ) -> Result<()> { + todo!() + } +} + +pub struct ParquetFormatPipe; + +#[async_trait::async_trait] +impl InputFormatPipe for ParquetFormatPipe { + type ReadBatch = ReadBatch; + type RowBatch = RowGroupInMemory; + type AligningState = AligningState; + type BlockBuilder = ParquetBlockBuilder; +} + +pub struct SplitMeta { + row_groups: Vec, +} + +pub struct RowGroupInMemory {} + +impl Debug for RowGroupInMemory { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "RowGroupInMemory") + } +} + +#[derive(Debug)] +pub enum ReadBatch { + Buffer(Vec), + RowGroup(RowGroupInMemory), +} + +impl From> for ReadBatch { + fn from(v: Vec) -> Self { + Self::Buffer(v) + } +} + +pub struct ParquetBlockBuilder { + ctx: Arc, +} + +impl BlockBuilderTrait for ParquetBlockBuilder { + type Pipe = ParquetFormatPipe; + + fn create(ctx: Arc) -> Self { + ParquetBlockBuilder { ctx } + } + + fn deserialize(&mut self, batch: Option) -> Result> { + todo!() + } +} + +pub struct AligningState { + buffers: Vec>, +} + +impl AligningStateTrait for AligningState { + type Pipe = ParquetFormatPipe; + + fn try_create(ctx: &Arc, split_info: &SplitInfo) -> Result { + todo!() + } + + fn align(&mut self, read_batch: Option) -> Result> { + todo!() + } +} diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_tsv.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_tsv.rs new file mode 100644 index 000000000000..94f4c92db377 --- /dev/null +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_tsv.rs @@ -0,0 +1,116 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_datavalues::TypeDeserializer; +use common_exception::ErrorCode; +use common_exception::Result; +use common_io::prelude::BufferReadExt; +use common_io::prelude::FormatSettings; +use common_io::prelude::NestedCheckpointReader; + +use crate::processors::sources::input_formats::input_format_text::AligningState; +use crate::processors::sources::input_formats::input_format_text::BlockBuilder; +use crate::processors::sources::input_formats::input_format_text::InputFormatTextBase; +use crate::processors::sources::input_formats::input_format_text::RowBatch; + +pub struct InputFormatTSV {} + +impl InputFormatTSV { + fn read_row( + buf: &[u8], + deserializers: &mut Vec, + format_settings: &FormatSettings, + path: &str, + offset: usize, + row_index: Option, + ) -> Result<()> { + let len = deserializers.len(); + let mut n_col = 0; + let mut field_start = 0; + let mut pos = 0; + let mut err_msg = None; + for b in buf.iter() { + if *b == b'\t' { + let col_data = &buf[field_start..pos]; + if col_data.is_empty() { + deserializers[n_col].de_default(format_settings); + } else { + let mut reader = NestedCheckpointReader::new(col_data); + reader.ignores(|c: u8| c == b' ').expect("must success"); + if let Err(e) = deserializers[n_col].de_text(&mut reader, format_settings) { + err_msg = Some(format!( + "fail to decode column {}: {:?}, [column_data]=[{}]", + n_col, e, "" + )); + break; + }; + // todo(youngsofun): check remaining data + } + n_col += 1; + field_start = pos + 1; + if n_col > len { + err_msg = Some("too many columns".to_string()); + break; + } + } + pos += 1; + } + if n_col < len { + // todo(youngsofun): allow it optionally (set default) + err_msg = Some(format!("need {} columns, find {} only", len, n_col).to_string()); + } + if let Some(m) = err_msg { + let row_info = if let Some(r) = row_index { + format!("at row {},", r) + } else { + String::new() + }; + let msg = format!( + "fail to parse tsv {} at offset {}, {}, reason={}", + path, + offset + pos, + row_info, + m + ); + Err(ErrorCode::BadBytes(msg)) + } else { + Ok(()) + } + } +} + +impl InputFormatTextBase for InputFormatTSV { + fn deserialize(builder: &mut BlockBuilder, batch: RowBatch) -> Result<()> { + let columns = &mut builder.mutable_columns; + let mut start = 0usize; + let start_row = batch.start_row; + for (i, end) in batch.row_ends.iter().enumerate() { + let buf = &batch.data[start..*end]; + Self::read_row( + buf, + columns, + &builder.ctx.format_settings, + &batch.path, + batch.offset + start, + start_row.map(|n| n + i), + )?; + start = *end; + } + Ok(()) + } + + fn align(state: &mut AligningState, buf: &[u8]) -> Result> { + Ok(state.align_by_row_delimiter(buf)) + } +} diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/mod.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/mod.rs new file mode 100644 index 000000000000..73d07cc71c1a --- /dev/null +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/mod.rs @@ -0,0 +1,16 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod input_format_parquet; +pub mod input_format_tsv; diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/input_context.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/input_context.rs new file mode 100644 index 000000000000..ff47e357ab94 --- /dev/null +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/input_context.rs @@ -0,0 +1,188 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_base::base::tokio::sync::Mutex; +use common_base::base::Progress; +use common_datavalues::DataSchemaRef; +use common_exception::ErrorCode; +use common_exception::Result; +use common_io::prelude::FormatSettings; +use common_meta_types::StageFileCompression; +use common_meta_types::StageFileFormatType; +use common_meta_types::UserStageInfo; +use common_settings::Settings; +use opendal::io_util::CompressAlgorithm; +use opendal::Operator; + +use crate::processors::sources::input_formats::delimiter::RecordDelimiter; +use crate::processors::sources::input_formats::impls::input_format_tsv::InputFormatTSV; +use crate::processors::sources::input_formats::input_format::FileInfo; +use crate::processors::sources::input_formats::input_format::SplitInfo; +use crate::processors::sources::input_formats::input_format_text::InputFormatText; +use crate::processors::sources::input_formats::InputFormat; + +pub enum InputPlan { + CopyInto(Box), + StreamingLoad, + ClickHouseInsert, +} + +pub struct CopyIntoPlan { + pub stage_info: UserStageInfo, + pub files: Vec, +} + +pub struct InputProgress { + // todo(youngsofun): add write progress and errors + scan_progress: Progress, + size: usize, +} + +pub struct InputContext { + pub plan: InputPlan, + pub schema: DataSchemaRef, + pub operator: Operator, + pub format: Arc, + pub splits: Vec, + + // row format only + pub rows_to_skip: usize, + pub field_delimiter: u8, + pub record_delimiter: RecordDelimiter, + + // runtime config + pub settings: Settings, + pub format_settings: FormatSettings, + + pub read_batch_size: usize, + pub rows_per_block: usize, + + pub progress_total: Mutex>>, + pub progress_by_file: Mutex>>, +} + +impl InputContext { + pub fn get_input_format(format: &StageFileFormatType) -> Result> { + match format { + StageFileFormatType::Tsv => Ok(Arc::new(InputFormatText::::create())), + format => Err(ErrorCode::LogicalError(format!( + "Unsupported file format: {:?}", + format + ))), + } + } + + async fn try_create_from_copy( + operator: Operator, + settings: Settings, + format_settings: FormatSettings, + schema: DataSchemaRef, + plan: CopyIntoPlan, + ) -> Result { + let read_batch_size = 1024 * 1024; + let split_size = 128usize * 1024 * 1024; + let file_format_options = &plan.stage_info.file_format_options; + let format = Self::get_input_format(&file_format_options.format)?; + let files = Self::get_file_infos(&format, &operator, &plan).await?; + let splits = format.split_files(files, split_size); + let rows_per_block = settings.get_max_block_size()? as usize; + let record_delimiter = + RecordDelimiter::try_from(file_format_options.record_delimiter.as_str())?; + Ok(InputContext { + format, + schema, + operator, + splits, + settings, + format_settings, + record_delimiter, + rows_per_block, + read_batch_size, + rows_to_skip: file_format_options.skip_header as usize, + field_delimiter: file_format_options.field_delimiter.as_bytes()[0], + progress_total: Default::default(), + plan: InputPlan::CopyInto(Box::new(plan)), + progress_by_file: Default::default(), + }) + } + + async fn get_file_infos( + format: &Arc, + op: &Operator, + plan: &CopyIntoPlan, + ) -> Result> { + let mut infos = vec![]; + for p in &plan.files { + let obj = op.object(p); + let size = obj.metadata().await?.content_length() as usize; + let file_meta = format.read_file_meta(&obj, size).await?; + let compress_alg = InputContext::get_compression_alg_copy( + plan.stage_info.file_format_options.compression, + p, + )?; + let info = FileInfo { + path: p.clone(), + size, + compress_alg, + file_meta, + }; + infos.push(info) + } + Ok(infos) + } + + pub fn num_prefetch_splits(&self) -> Result { + Ok(self.settings.get_max_threads()? as usize) + } + + pub fn num_prefetch_per_split(&self) -> usize { + 1 + } + + pub fn get_compression_alg(&self, path: &str) -> Result> { + let opt = match &self.plan { + InputPlan::CopyInto(p) => p.stage_info.file_format_options.compression, + _ => StageFileCompression::None, + }; + Self::get_compression_alg_copy(opt, path) + } + + pub fn get_compression_alg_copy( + compress_option: StageFileCompression, + path: &str, + ) -> Result> { + let compression_algo = match compress_option { + StageFileCompression::Auto => CompressAlgorithm::from_path(path), + StageFileCompression::Gzip => Some(CompressAlgorithm::Gzip), + StageFileCompression::Bz2 => Some(CompressAlgorithm::Bz2), + StageFileCompression::Brotli => Some(CompressAlgorithm::Brotli), + StageFileCompression::Zstd => Some(CompressAlgorithm::Zstd), + StageFileCompression::Deflate => Some(CompressAlgorithm::Zlib), + StageFileCompression::RawDeflate => Some(CompressAlgorithm::Deflate), + StageFileCompression::Xz => Some(CompressAlgorithm::Xz), + StageFileCompression::Lzo => { + return Err(ErrorCode::UnImplement("compress type lzo is unimplemented")); + } + StageFileCompression::Snappy => { + return Err(ErrorCode::UnImplement( + "compress type snappy is unimplemented", + )); + } + StageFileCompression::None => None, + }; + Ok(compression_algo) + } +} diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/input_format.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/input_format.rs new file mode 100644 index 000000000000..e594a0aea386 --- /dev/null +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/input_format.rs @@ -0,0 +1,123 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::cmp::min; +use std::sync::Arc; + +use common_base::base::tokio::sync::mpsc::Receiver; +use common_exception::Result; +use common_pipeline_core::Pipeline; +use opendal::io_util::CompressAlgorithm; +use opendal::Object; + +use crate::processors::sources::input_formats::input_context::InputContext; +use crate::processors::sources::input_formats::input_pipeline::StreamingReadBatch; + +pub trait InputData: Send + Sync + 'static { + fn as_any(&self) -> &dyn Any; +} + +pub trait InputState: Send { + fn as_any(&mut self) -> &mut dyn Any; +} + +#[async_trait::async_trait] +pub trait InputFormat: Send + Sync { + async fn read_file_meta(&self, obj: &Object, size: usize) + -> Result>>; + + async fn read_split_meta( + &self, + obj: &Object, + split_info: &SplitInfo, + ) -> Result>>; + + fn split_files(&self, file_infos: Vec, split_size: usize) -> Vec; + + fn exec_copy(&self, ctx: Arc, pipeline: &mut Pipeline) -> Result<()>; + + fn exec_stream( + &self, + ctx: Arc, + pipeline: &mut Pipeline, + input: Receiver, + ) -> Result<()>; +} + +#[derive(Clone)] +pub struct FileInfo { + pub path: String, + pub size: usize, + pub compress_alg: Option, + pub file_meta: Option>, +} + +impl FileInfo { + pub fn split_by_size(&self, split_size: usize) -> Vec { + let mut splits = vec![]; + let n = (self.size + split_size - 1) / split_size; + for i in 0..n - 1 { + splits.push(SplitInfo { + file_info: self.clone(), + seq_infile: i, + is_end: i == n - 1, + offset: i * split_size, + len: min((i + 1) * split_size, self.size), + split_meta: None, + }) + } + splits + } +} + +#[derive(Clone)] +pub struct SplitInfo { + pub file_info: FileInfo, + pub seq_infile: usize, + pub is_end: bool, + pub offset: usize, + pub len: usize, + pub split_meta: Option>, +} + +impl SplitInfo { + pub fn from_file_info(file_info: FileInfo) -> Self { + let len = file_info.size; + Self { + file_info, + seq_infile: 0, + is_end: true, + offset: 0, + len, + split_meta: None, + } + } + + pub fn from_stream_split(path: String) -> Self { + SplitInfo { + file_info: FileInfo { + path, + size: 0, + compress_alg: None, + file_meta: None, + }, + seq_infile: 0, + offset: 0, + len: 0, + is_end: false, + split_meta: None, + } + } +} diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/input_format_text.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/input_format_text.rs new file mode 100644 index 000000000000..770c87c067ac --- /dev/null +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/input_format_text.rs @@ -0,0 +1,307 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::marker::PhantomData; +use std::mem; +use std::sync::Arc; + +use common_base::base::tokio::sync::mpsc::Receiver; +use common_datablocks::DataBlock; +use common_datavalues::TypeDeserializer; +use common_datavalues::TypeDeserializerImpl; +use common_exception::ErrorCode; +use common_exception::Result; +use common_pipeline_core::Pipeline; +use opendal::io_util::DecompressDecoder; +use opendal::io_util::DecompressState; +use opendal::Object; + +use super::InputFormat; +use crate::processors::sources::input_formats::input_context::InputContext; +use crate::processors::sources::input_formats::input_format::FileInfo; +use crate::processors::sources::input_formats::input_format::InputData; +use crate::processors::sources::input_formats::input_format::SplitInfo; +use crate::processors::sources::input_formats::input_pipeline::AligningStateTrait; +use crate::processors::sources::input_formats::input_pipeline::BlockBuilderTrait; +use crate::processors::sources::input_formats::input_pipeline::InputFormatPipe; +use crate::processors::sources::input_formats::input_pipeline::StreamingReadBatch; + +pub trait InputFormatTextBase: Sized + Send + Sync + 'static { + fn is_splittable() -> bool { + false + } + fn deserialize(builder: &mut BlockBuilder, batch: RowBatch) -> Result<()>; + + fn align(state: &mut AligningState, buf: &[u8]) -> Result>; +} + +pub struct InputFormatText { + phantom: PhantomData, +} + +impl InputFormatText { + pub fn create() -> Self { + Self { + phantom: Default::default(), + } + } +} + +pub struct InputFormatTextPipe { + phantom: PhantomData, +} + +#[async_trait::async_trait] +impl InputFormatPipe for InputFormatTextPipe { + type ReadBatch = Vec; + type RowBatch = RowBatch; + type AligningState = AligningState; + type BlockBuilder = BlockBuilder; +} + +#[async_trait::async_trait] +impl InputFormat for InputFormatText { + async fn read_file_meta( + &self, + _obj: &Object, + _size: usize, + ) -> Result>> { + Ok(None) + } + + async fn read_split_meta( + &self, + _obj: &Object, + _split_info: &SplitInfo, + ) -> Result>> { + Ok(None) + } + + fn split_files(&self, file_infos: Vec, split_size: usize) -> Vec { + let mut splits = vec![]; + for f in file_infos { + if f.compress_alg.is_none() || !T::is_splittable() { + splits.push(SplitInfo::from_file_info(f)) + } else { + splits.append(&mut f.split_by_size(split_size)) + } + } + splits + } + + fn exec_copy(&self, ctx: Arc, pipeline: &mut Pipeline) -> Result<()> { + InputFormatTextPipe::::execute_copy_with_aligner(ctx, pipeline) + } + + fn exec_stream( + &self, + ctx: Arc, + pipeline: &mut Pipeline, + input: Receiver, + ) -> Result<()> { + InputFormatTextPipe::::execute_stream(ctx, pipeline, input) + } +} + +#[derive(Default)] +pub struct RowBatch { + pub data: Vec, + pub row_ends: Vec, + pub field_ends: Vec, + + // for error info + pub path: String, + pub offset: usize, + pub start_row: Option, +} + +pub struct AligningState { + pub path: String, + pub record_delimiter_end: u8, + pub field_delimiter: u8, + pub offset: usize, + pub rows_to_skip: usize, + pub tail_of_last_batch: Vec, + + pub decoder: Option, + phantom: PhantomData, +} + +impl AligningState { + pub fn align_by_row_delimiter(&mut self, buf: &[u8]) -> Vec { + let mut output = RowBatch::default(); + output.data.extend_from_slice(&self.tail_of_last_batch); + let rows = &mut output.row_ends; + for (i, b) in buf.iter().enumerate() { + if *b == b'\n' { + rows.push(i) + } + } + let last = rows[rows.len() - 1]; + if rows.is_empty() { + self.tail_of_last_batch.extend_from_slice(buf); + vec![] + } else { + output.data = mem::take(&mut self.tail_of_last_batch); + output.data.extend_from_slice(&buf[0..last + 1]); + let size = output.data.len(); + output.path = self.path.to_string(); + self.offset += size; + vec![output] + } + } + + fn flush(&mut self) -> Vec { + // last row + let data = mem::take(&mut self.tail_of_last_batch); + let end = data.len(); + let row_batch = RowBatch { + data, + row_ends: vec![end], + field_ends: vec![], + path: self.path.to_string(), + offset: self.offset, + start_row: None, + }; + vec![row_batch] + } +} + +impl AligningStateTrait for AligningState { + type Pipe = InputFormatTextPipe; + + fn try_create(ctx: &Arc, split_info: &SplitInfo) -> Result { + let rows_to_skip = if split_info.seq_infile == 0 { + ctx.rows_to_skip + } else { + 0 + }; + let path = split_info.file_info.path.clone(); + + let decoder = ctx.get_compression_alg(&path)?.map(DecompressDecoder::new); + + Ok(AligningState:: { + offset: split_info.offset, + record_delimiter_end: ctx.record_delimiter.end(), + field_delimiter: ctx.field_delimiter, + path, + decoder, + rows_to_skip, + tail_of_last_batch: vec![], + phantom: Default::default(), + }) + } + + fn align(&mut self, read_batch: Option>) -> Result> { + let row_batches = if let Some(data) = read_batch { + let buf = if let Some(decoder) = self.decoder.as_mut() { + decompress(decoder, &data)? + } else { + data + }; + T::align(self, &buf)? + } else { + if let Some(decoder) = &self.decoder { + assert_eq!(decoder.state(), DecompressState::Done) + } + self.flush() + }; + Ok(row_batches) + } +} + +pub struct BlockBuilder { + pub ctx: Arc, + pub mutable_columns: Vec, + pub num_rows: usize, + phantom: PhantomData, +} + +impl BlockBuilder { + fn flush(&mut self) -> Result> { + let mut columns = Vec::with_capacity(self.mutable_columns.len()); + for deserializer in &mut self.mutable_columns { + columns.push(deserializer.finish_to_column()); + } + self.mutable_columns = self + .ctx + .schema + .create_deserializers(self.ctx.rows_per_block); + self.num_rows = 0; + + Ok(vec![DataBlock::create(self.ctx.schema.clone(), columns)]) + } +} + +impl BlockBuilderTrait for BlockBuilder { + type Pipe = InputFormatTextPipe; + + fn create(ctx: Arc) -> Self { + let columns = ctx.schema.create_deserializers(ctx.rows_per_block); + BlockBuilder { + ctx, + mutable_columns: columns, + num_rows: 0, + phantom: Default::default(), + } + } + + fn deserialize(&mut self, batch: Option) -> Result> { + if let Some(b) = batch { + self.num_rows += b.row_ends.len(); + T::deserialize(self, b)?; + if self.num_rows >= self.ctx.rows_per_block { + self.flush() + } else { + Ok(vec![]) + } + } else { + self.flush() + } + } +} + +fn decompress(decoder: &mut DecompressDecoder, compressed: &[u8]) -> Result> { + let mut decompress_bufs = vec![]; + let mut amt = 0; + loop { + match decoder.state() { + DecompressState::Reading => { + if amt == compressed.len() { + break; + } + let read = decoder.fill(&compressed[amt..]); + amt += read; + } + DecompressState::Decoding => { + let mut decompress_buf = vec![0u8; 4096]; + let written = decoder.decode(&mut decompress_buf[..]).map_err(|e| { + ErrorCode::InvalidCompressionData(format!("compression data invalid: {e}")) + })?; + decompress_buf.truncate(written); + decompress_bufs.push(decompress_buf); + } + DecompressState::Flushing => { + let mut decompress_buf = vec![0u8; 4096]; + let written = decoder.finish(&mut decompress_buf).map_err(|e| { + ErrorCode::InvalidCompressionData(format!("compression data invalid: {e}")) + })?; + decompress_buf.truncate(written); + decompress_bufs.push(decompress_buf); + } + DecompressState::Done => break, + } + } + Ok(decompress_bufs.concat()) +} diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/input_pipeline.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/input_pipeline.rs new file mode 100644 index 000000000000..221ebf6ca526 --- /dev/null +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/input_pipeline.rs @@ -0,0 +1,251 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Debug; +use std::sync::Arc; + +use common_base::base::tokio; +use common_base::base::tokio::sync::mpsc::Receiver; +use common_base::base::tokio::sync::mpsc::Sender; +use common_datablocks::DataBlock; +use common_exception::ErrorCode; +use common_exception::Result; +use common_pipeline_core::processors::port::InputPort; +use common_pipeline_core::processors::port::OutputPort; +use common_pipeline_core::Pipeline; +use common_pipeline_core::SourcePipeBuilder; +use futures_util::stream::FuturesUnordered; +use futures_util::AsyncReadExt; +use futures_util::StreamExt; + +use crate::processors::sources::input_formats::input_context::InputContext; +use crate::processors::sources::input_formats::input_format::SplitInfo; +use crate::processors::sources::input_formats::processor_aligner::Aligner; +use crate::processors::sources::input_formats::processor_deserializer::DeserializeProcessor; + +pub struct Split { + pub(crate) info: SplitInfo, + pub(crate) rx: Receiver, +} + +pub struct StreamingSplit { + path: String, + data_tx: Sender, +} + +pub struct StreamingReadBatch { + data: Vec, + pub(crate) path: String, + pub(crate) is_start: bool, +} + +pub trait AligningStateTrait: Sized { + type Pipe: InputFormatPipe; + + fn try_create(ctx: &Arc, split_info: &SplitInfo) -> Result; + + fn align( + &mut self, + read_batch: Option<::ReadBatch>, + ) -> Result::RowBatch>>; +} + +pub trait BlockBuilderTrait { + type Pipe: InputFormatPipe; + fn create(ctx: Arc) -> Self; + + fn deserialize( + &mut self, + batch: Option<::RowBatch>, + ) -> Result>; +} + +#[async_trait::async_trait] +pub trait InputFormatPipe: Sized + Send + 'static { + type ReadBatch: From> + Send + Debug; + type RowBatch: Send; + type AligningState: AligningStateTrait + Send; + type BlockBuilder: BlockBuilderTrait + Send; + + fn execute_stream( + ctx: Arc, + pipeline: &mut Pipeline, + mut input: Receiver, + ) -> Result<()> { + let (split_tx, split_rx) = async_channel::bounded(ctx.num_prefetch_splits()?); + Self::build_pipeline_with_aligner(&ctx, split_rx, pipeline)?; + + tokio::spawn(async move { + let mut sender: Option> = None; + while let Some(batch) = input.recv().await { + if batch.is_start { + let (data_tx, data_rx) = tokio::sync::mpsc::channel(1); + sender = Some(data_tx); + let split_info = SplitInfo::from_stream_split(batch.path.clone()); + split_tx + .send(Split { + info: split_info, + rx: data_rx, + }) + .await + .expect("fail to send split from stream"); + } + if let Some(s) = sender.as_mut() { + s.send(batch.data.into()) + .await + .expect("fail to send read batch from stream"); + } + } + }); + Ok(()) + } + + fn execute_copy_with_aligner(ctx: Arc, pipeline: &mut Pipeline) -> Result<()> { + let (split_tx, split_rx) = async_channel::bounded(ctx.num_prefetch_splits()?); + Self::build_pipeline_with_aligner(&ctx, split_rx, pipeline)?; + + let ctx_clone = ctx.clone(); + tokio::spawn(async move { + for s in &ctx_clone.splits { + let (data_tx, data_rx) = tokio::sync::mpsc::channel(ctx.num_prefetch_per_split()); + let split_clone = s.clone(); + let ctx_clone2 = ctx_clone.clone(); + tokio::spawn(async move { + if let Err(e) = + Self::copy_reader_with_aligner(ctx_clone2, split_clone, data_tx).await + { + tracing::error!("copy error: {:?}", e); + } + }); + if split_tx + .send(Split { + info: s.clone(), + rx: data_rx, + }) + .await + .is_err() + { + break; + }; + } + }); + Ok(()) + } + + fn execute_copy_aligned(ctx: Arc, pipeline: &mut Pipeline) -> Result<()> { + let (data_tx, data_rx) = async_channel::bounded(ctx.num_prefetch_splits()?); + Self::build_pipeline_aligned(&ctx, data_rx, pipeline)?; + + let ctx_clone = ctx.clone(); + let p = 3; + tokio::spawn(async move { + let mut futs = FuturesUnordered::new(); + for s in &ctx_clone.splits { + let fut = Self::read_split(ctx_clone.clone(), s.clone()); + futs.push(fut); + if futs.len() >= p { + let row_batch = futs.next().await.unwrap().unwrap(); + data_tx.send(row_batch).await.unwrap(); + } + } + + while let Some(row_batch) = futs.next().await { + data_tx.send(row_batch.unwrap()).await.unwrap(); + } + }); + Ok(()) + } + + fn build_pipeline_aligned( + ctx: &Arc, + row_batch_rx: async_channel::Receiver, + pipeline: &mut Pipeline, + ) -> Result<()> { + let mut builder = SourcePipeBuilder::create(); + for _ in 0..ctx.settings.get_max_threads()? { + let output = OutputPort::create(); + let input = InputPort::create(); + let source = DeserializeProcessor::::create( + ctx.clone(), + input, + output.clone(), + row_batch_rx.clone(), + )?; + builder.add_source(output, source); + } + pipeline.add_pipe(builder.finalize()); + Ok(()) + } + + fn build_pipeline_with_aligner( + ctx: &Arc, + split_rx: async_channel::Receiver>, + pipeline: &mut Pipeline, + ) -> Result<()> { + let (row_batch_tx, row_batch_rx) = async_channel::bounded(10); + let mut builder = SourcePipeBuilder::create(); + for _ in 0..ctx.settings.get_max_threads()? { + let output = OutputPort::create(); + let source = Aligner::::try_create( + output.clone(), + ctx.clone(), + split_rx.clone(), + row_batch_tx.clone(), + )?; + builder.add_source(output, source); + } + pipeline.add_pipe(builder.finalize()); + pipeline.add_transform(|input, output| { + DeserializeProcessor::::create(ctx.clone(), input, output, row_batch_rx.clone()) + })?; + Ok(()) + } + + async fn read_split(_ctx: Arc, _split_info: SplitInfo) -> Result { + unimplemented!() + } + + async fn copy_reader_with_aligner( + ctx: Arc, + split_info: SplitInfo, + batch_tx: Sender, + ) -> Result<()> { + let object = ctx.operator.object(&split_info.file_info.path); + let offset = split_info.offset as u64; + let mut reader = object.range_reader(offset..).await?; + loop { + let mut batch = vec![0u8; ctx.read_batch_size]; + + let mut buf = &mut batch[0..]; + let mut n = 0; + while !buf.is_empty() { + let read = reader.read(buf).await?; + if read == 0 { + break; + } + n += read; + buf = &mut batch[n..] + } + if n == 0 { + break; + } else { + batch_tx + .send(batch.into()) + .await + .map_err(|_| ErrorCode::UnexpectedError("fail to send ReadBatch"))?; + } + } + Ok(()) + } +} diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/mod.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/mod.rs new file mode 100644 index 000000000000..444adc8c5006 --- /dev/null +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/mod.rs @@ -0,0 +1,24 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod delimiter; +mod impls; +mod input_context; +mod input_format; +mod input_format_text; +mod input_pipeline; +mod processor_aligner; +mod processor_deserializer; + +pub use input_format::InputFormat; diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/processor_aligner.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/processor_aligner.rs new file mode 100644 index 000000000000..2ec99dacb0c3 --- /dev/null +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/processor_aligner.rs @@ -0,0 +1,150 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::collections::VecDeque; +use std::mem; +use std::sync::Arc; + +use common_base::base::tokio::sync::mpsc::Receiver; +use common_exception::ErrorCode; +use common_exception::Result; +use common_pipeline_core::processors::port::OutputPort; +use common_pipeline_core::processors::processor::Event; +use common_pipeline_core::processors::processor::ProcessorPtr; +use common_pipeline_core::processors::Processor; + +use crate::processors::sources::input_formats::input_context::InputContext; +use crate::processors::sources::input_formats::input_pipeline::AligningStateTrait; +use crate::processors::sources::input_formats::input_pipeline::InputFormatPipe; +use crate::processors::sources::input_formats::input_pipeline::Split; + +pub struct Aligner { + ctx: Arc, + #[allow(unused)] + output: Arc, + + // input + split_rx: async_channel::Receiver>, + + state: Option, + batch_rx: Option>, + read_batch: Option, + + end_batch: bool, + no_more_split: bool, + + // output + row_batches: VecDeque, + row_batch_tx: async_channel::Sender, +} + +impl Aligner { + pub(crate) fn try_create( + output: Arc, + ctx: Arc, + split_rx: async_channel::Receiver>, + batch_tx: async_channel::Sender, + ) -> Result { + Ok(ProcessorPtr::create(Box::new(Self { + ctx, + output, + split_rx, + row_batch_tx: batch_tx, + state: None, + read_batch: None, + batch_rx: None, + end_batch: false, + no_more_split: false, + row_batches: Default::default(), + }))) + } +} + +#[async_trait::async_trait] +impl Processor for Aligner { + fn name(&self) -> &'static str { + "Aligner" + } + + fn as_any(&mut self) -> &mut dyn Any { + self + } + + fn event(&mut self) -> Result { + if self.no_more_split && self.row_batches.is_empty() && self.read_batch.is_none() { + Ok(Event::Finished) + } else if self.row_batches.is_empty() { + Ok(Event::Async) + } else if self.read_batch.is_some() { + Ok(Event::Sync) + } else { + Ok(Event::Async) + } + } + + fn process(&mut self) -> Result<()> { + match &mut self.state { + Some(state) => { + let read_batch = mem::take(&mut self.read_batch); + let eof = read_batch.is_none(); + let row_batches = state.align(read_batch)?; + for b in row_batches.into_iter() { + self.row_batches.push_back(b); + } + if eof { + self.state = None; + self.batch_rx = None; + } else { + self.read_batch = None; + } + Ok(()) + } + _ => Err(ErrorCode::UnexpectedError("Aligner process state is none")), + } + } + + async fn async_process(&mut self) -> Result<()> { + while let Some(rb) = self.row_batches.pop_front() { + self.row_batch_tx + .send(rb) + .await + .map_err(|_| ErrorCode::UnexpectedError("fail to send row batch"))?; + } + + if !self.no_more_split { + if self.state.is_none() { + match self.split_rx.recv().await { + Ok(split) => { + self.state = Some(I::AligningState::try_create(&self.ctx, &split.info)?); + self.batch_rx = Some(split.rx); + self.end_batch = false; + } + Err(_) => { + self.no_more_split = true; + } + } + } + if let Some(rx) = self.batch_rx.as_mut() { + match rx.recv().await { + Some(batch) => self.read_batch = Some(batch), + None => { + self.end_batch = true; + } + } + } + } + Ok(()) + } +} diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/processor_deserializer.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/processor_deserializer.rs new file mode 100644 index 000000000000..24d6ac1ff429 --- /dev/null +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/processor_deserializer.rs @@ -0,0 +1,121 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::collections::VecDeque; +use std::sync::Arc; + +use common_datablocks::DataBlock; +use common_exception::Result; +use common_pipeline_core::processors::port::InputPort; +use common_pipeline_core::processors::port::OutputPort; +use common_pipeline_core::processors::processor::Event; +use common_pipeline_core::processors::processor::ProcessorPtr; +use common_pipeline_core::processors::Processor; + +use crate::processors::sources::input_formats::input_context::InputContext; +use crate::processors::sources::input_formats::input_pipeline::BlockBuilderTrait; +use crate::processors::sources::input_formats::input_pipeline::InputFormatPipe; + +pub struct DeserializeProcessor { + #[allow(unused)] + input: Arc, + #[allow(unused)] + output: Arc, + + block_builder: I::BlockBuilder, + input_rx: async_channel::Receiver, + input_buffer: Option, + input_finished: bool, + output_buffer: VecDeque, +} + +impl DeserializeProcessor { + pub(crate) fn create( + ctx: Arc, + input: Arc, + output: Arc, + rx: async_channel::Receiver, + ) -> Result { + Ok(ProcessorPtr::create(Box::new(Self { + block_builder: I::BlockBuilder::create(ctx), + input, + output, + input_rx: rx, + input_buffer: Default::default(), + input_finished: false, + output_buffer: Default::default(), + }))) + } +} + +#[async_trait::async_trait] +impl Processor for DeserializeProcessor { + fn name(&self) -> &'static str { + "Deserializer" + } + + fn as_any(&mut self) -> &mut dyn Any { + self + } + + fn event(&mut self) -> Result { + if self.output.is_finished() { + self.input_buffer = None; + self.input_finished = true; + Ok(Event::Finished) + } else if self.output.can_push() { + self.input.set_not_need_data(); + Ok(Event::NeedConsume) + } else { + match self.output_buffer.pop_front() { + None => { + if self.input_buffer.is_some() { + Ok(Event::Sync) + } else { + Ok(Event::Async) + } + } + Some(data_block) => { + self.output.push_data(Ok(data_block)); + Ok(Event::NeedConsume) + } + } + } + } + + fn process(&mut self) -> Result<()> { + if self.input_finished { + assert!(self.input_buffer.is_none()); + } + let blocks = self.block_builder.deserialize(self.input_buffer.take())?; + for b in blocks.into_iter() { + self.output_buffer.push_back(b) + } + Ok(()) + } + + async fn async_process(&mut self) -> Result<()> { + assert!(self.input_buffer.is_none() && !self.input_finished); + match self.input_rx.recv().await { + Ok(row_batch) => { + self.input_buffer = Some(row_batch); + } + Err(_) => { + self.input_finished = true; + } + } + Ok(()) + } +} diff --git a/src/query/pipeline/sources/src/processors/sources/mod.rs b/src/query/pipeline/sources/src/processors/sources/mod.rs index 4c20fb571d7b..5246c76a6287 100644 --- a/src/query/pipeline/sources/src/processors/sources/mod.rs +++ b/src/query/pipeline/sources/src/processors/sources/mod.rs @@ -17,6 +17,7 @@ pub mod blocks_source; pub mod deserializer; pub mod empty_source; pub mod file_splitter; +pub mod input_formats; pub mod multi_file_splitter; mod one_block_source; pub mod stream_source; diff --git a/src/query/service/src/storages/stage/stage_source.rs b/src/query/service/src/storages/stage/stage_source.rs index 5659e4dd8cb4..12ce68f92264 100644 --- a/src/query/service/src/storages/stage/stage_source.rs +++ b/src/query/service/src/storages/stage/stage_source.rs @@ -82,6 +82,7 @@ impl StageSourceHelper { } else { OperatorInfo::Cfg(stage_info.stage_params.storage.clone()) }; + let src = StageSourceHelper { ctx, operator_info,