From ec1179c3f46840a279a793d562ee6b8c20495d83 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Thu, 22 Sep 2022 16:49:55 +0800 Subject: [PATCH 01/24] fix typo. --- src/query/ast/src/ast/format/ast_format.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/query/ast/src/ast/format/ast_format.rs b/src/query/ast/src/ast/format/ast_format.rs index 114d4b53f03ea..32504a01e8e59 100644 --- a/src/query/ast/src/ast/format/ast_format.rs +++ b/src/query/ast/src/ast/format/ast_format.rs @@ -955,13 +955,13 @@ impl<'ast> Visitor<'ast> for AstFormatVisitor { fn visit_insert_source(&mut self, insert_source: &'ast InsertSource<'ast>) { match insert_source { InsertSource::Streaming { format, .. } => { - let streaming_name = format!("StreamSouce {}", format); + let streaming_name = format!("StreamSource {}", format); let streaming_format_ctx = AstFormatContext::new(streaming_name); let streaming_node = FormatTreeNode::new(streaming_format_ctx); self.children.push(streaming_node); } InsertSource::Values { .. } => { - let values_name = "ValueSouce".to_string(); + let values_name = "ValueSource".to_string(); let values_format_ctx = AstFormatContext::new(values_name); let values_node = FormatTreeNode::new(values_format_ctx); self.children.push(values_node); From 3850540b6ceb3523d087d6e9c2d641fa04b8236c Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Fri, 23 Sep 2022 09:16:50 +0800 Subject: [PATCH 02/24] refactor(insert): InsertSource::Streaming with start offset of data. --- src/query/ast/src/ast/format/syntax/dml.rs | 9 +++++++-- src/query/ast/src/ast/statements/insert.rs | 20 +++++++++++++++---- src/query/ast/src/parser/statement.rs | 12 +++++++---- .../src/interpreters/interpreter_insert_v2.rs | 3 ++- src/query/service/src/servers/http/v1/load.rs | 9 +++------ .../service/src/sql/planner/binder/insert.rs | 10 ++++++---- .../service/src/sql/planner/plans/insert.rs | 4 ++-- 7 files changed, 44 insertions(+), 23 deletions(-) diff --git a/src/query/ast/src/ast/format/syntax/dml.rs b/src/query/ast/src/ast/format/syntax/dml.rs index 4d20313b4631f..9ddf1d6b8d6ec 100644 --- a/src/query/ast/src/ast/format/syntax/dml.rs +++ b/src/query/ast/src/ast/format/syntax/dml.rs @@ -69,13 +69,18 @@ pub(crate) fn pretty_insert(insert_stmt: InsertStmt) -> RcDoc { fn pretty_source(source: InsertSource) -> RcDoc { RcDoc::line().append(match source { - InsertSource::Streaming { format, rest_str } => RcDoc::text("FORMAT") + InsertSource::Streaming { + format, + rest_str, + start, + } => RcDoc::text("FORMAT") .append(RcDoc::space()) .append(RcDoc::text(format)) .append( RcDoc::line() .nest(NEST_FACTOR) - .append(RcDoc::text(rest_str.to_string())), + .append(RcDoc::text(rest_str.to_string())) + .append(RcDoc::text(start.to_string())), ), InsertSource::Values { rest_str } => RcDoc::text("VALUES").append( RcDoc::line() diff --git a/src/query/ast/src/ast/statements/insert.rs b/src/query/ast/src/ast/statements/insert.rs index 5e5fc520104b7..06baca647e51d 100644 --- a/src/query/ast/src/ast/statements/insert.rs +++ b/src/query/ast/src/ast/statements/insert.rs @@ -56,15 +56,27 @@ impl Display for InsertStmt<'_> { #[derive(Debug, Clone, PartialEq)] pub enum InsertSource<'a> { - Streaming { format: String, rest_str: &'a str }, - Values { rest_str: &'a str }, - Select { query: Box> }, + Streaming { + format: String, + rest_str: &'a str, + start: usize, + }, + Values { + rest_str: &'a str, + }, + Select { + query: Box>, + }, } impl Display for InsertSource<'_> { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { - InsertSource::Streaming { format, rest_str } => { + InsertSource::Streaming { + format, + rest_str, + start: _, + } => { write!(f, "FORMAT {format} {rest_str}") } InsertSource::Values { rest_str } => write!(f, "VALUES {rest_str}"), diff --git a/src/query/ast/src/parser/statement.rs b/src/query/ast/src/parser/statement.rs index 74fcc674d999c..550e2ba6d0ff0 100644 --- a/src/query/ast/src/parser/statement.rs +++ b/src/query/ast/src/parser/statement.rs @@ -995,16 +995,17 @@ pub fn insert_source(i: Input) -> IResult { rule! { FORMAT ~ #ident ~ #rest_str }, - |(_, format, rest_str)| InsertSource::Streaming { + |(_, format, (rest_str, start))| InsertSource::Streaming { format: format.name, rest_str, + start, }, ); let values = map( rule! { VALUES ~ #rest_str }, - |(_, rest_str)| InsertSource::Values { rest_str }, + |(_, (rest_str, _))| InsertSource::Values { rest_str }, ); let query = map(query, |query| InsertSource::Select { query: Box::new(query), @@ -1018,13 +1019,16 @@ pub fn insert_source(i: Input) -> IResult { } #[allow(clippy::needless_lifetimes)] -pub fn rest_str<'a>(i: Input<'a>) -> IResult<&'a str> { +pub fn rest_str<'a>(i: Input<'a>) -> IResult<(&'a str, usize)> { // It's safe to unwrap because input must contain EOI. let first_token = i.0.first().unwrap(); let last_token = i.0.last().unwrap(); Ok(( i.slice((i.len() - 1)..), - &first_token.source[first_token.span.start..last_token.span.end], + ( + &first_token.source[first_token.span.start..last_token.span.end], + first_token.span.start, + ), )) } diff --git a/src/query/service/src/interpreters/interpreter_insert_v2.rs b/src/query/service/src/interpreters/interpreter_insert_v2.rs index 93884d09118e6..05b29ddbeaa67 100644 --- a/src/query/service/src/interpreters/interpreter_insert_v2.rs +++ b/src/query/service/src/interpreters/interpreter_insert_v2.rs @@ -171,7 +171,8 @@ impl Interpreter for InsertInterpreterV2 { } build_res.main_pipeline.add_pipe(builder.finalize()); } - InsertInputSource::StreamingWithFormat(_, input_context) => { + InsertInputSource::StreamingWithFormat(_, _, input_context) => { + let input_context = input_context.as_ref().expect("must success").clone(); input_context .format .exec_stream(input_context.clone(), &mut build_res.main_pipeline)?; diff --git a/src/query/service/src/servers/http/v1/load.rs b/src/query/service/src/servers/http/v1/load.rs index 5d7309f23cb7d..7bf8ebf50f68f 100644 --- a/src/query/service/src/servers/http/v1/load.rs +++ b/src/query/service/src/servers/http/v1/load.rs @@ -120,7 +120,8 @@ pub async fn streaming_load( let schema = plan.schema(); match &mut plan { Plan::Insert(insert) => match &mut insert.source { - InsertInputSource::StrWithFormat((sql_rest, format)) => { + InsertInputSource::StreamingWithFormat(format, start, input_context_ref) => { + let sql_rest = &insert_sql[*start..].trim(); if !sql_rest.is_empty() { return Err(poem::Error::from_string( "should NOT have data after `Format` in streaming load.", @@ -139,13 +140,9 @@ pub async fn streaming_load( .await .map_err(InternalServerError)?, ); + *input_context_ref = Some(input_context.clone()); tracing::info!("streaming load {:?}", input_context); - insert.source = InsertInputSource::StreamingWithFormat( - format.to_string(), - input_context.clone(), - ); - let handler = context.spawn(execute_query(context.clone(), plan)); let files = read_multi_part(multipart, tx, &input_context).await?; diff --git a/src/query/service/src/sql/planner/binder/insert.rs b/src/query/service/src/sql/planner/binder/insert.rs index e70338a766d16..4db4b296d98a8 100644 --- a/src/query/service/src/sql/planner/binder/insert.rs +++ b/src/query/service/src/sql/planner/binder/insert.rs @@ -78,10 +78,12 @@ impl<'a> Binder { }; let input_source: Result = match source.clone() { - InsertSource::Streaming { format, rest_str } => { - self.analyze_stream_format(rest_str, Some(format)).await - } - InsertSource::Values { rest_str, .. } => { + InsertSource::Streaming { + format, + rest_str, + start, + } => Ok(InsertInputSource::StreamingWithFormat(format, start, None)), + InsertSource::Values { rest_str } => { let str = rest_str.trim_end_matches(';'); self.analyze_stream_format(str, Some("VALUES".to_string())) .await diff --git a/src/query/service/src/sql/planner/plans/insert.rs b/src/query/service/src/sql/planner/plans/insert.rs index d714e4769fe46..43db0fb548778 100644 --- a/src/query/service/src/sql/planner/plans/insert.rs +++ b/src/query/service/src/sql/planner/plans/insert.rs @@ -27,7 +27,7 @@ pub enum InsertInputSource { SelectPlan(Box), // From outside streaming source #[serde(skip)] - StreamingWithFormat(String, Arc), + StreamingWithFormat(String, usize, Option>), // From cloned String and format StrWithFormat((String, String)), } @@ -70,7 +70,7 @@ impl Insert { pub fn format(&self) -> Option<&str> { match &self.source { InsertInputSource::SelectPlan(_) => None, - InsertInputSource::StreamingWithFormat(v, _) => Some(v.as_str()), + InsertInputSource::StreamingWithFormat(v, ..) => Some(v.as_str()), InsertInputSource::StrWithFormat((_, v)) => Some(v.as_str()), } } From dddb7543118245351eef34aae0dc30148af8d1ff Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Fri, 23 Sep 2022 12:02:45 +0800 Subject: [PATCH 03/24] refactor(input_format): rm InputPlan::Clickhouse. --- .../processors/sources/input_formats/input_context.rs | 9 ++++++--- .../processors/sources/input_formats/input_pipeline.rs | 10 ++++++++-- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/input_context.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/input_context.rs index 83c07f5303649..63fe523654afe 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/input_context.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/input_context.rs @@ -49,7 +49,6 @@ const MIN_ROW_PER_BLOCK: usize = 800 * 1000; pub enum InputPlan { CopyInto(Box), StreamingLoad(StreamPlan), - Clickhouse, } impl InputPlan { @@ -69,6 +68,7 @@ pub struct CopyIntoPlan { #[derive(Debug)] pub struct StreamPlan { + pub is_multi_part: bool, pub compression: StageFileCompression, } @@ -215,6 +215,7 @@ impl InputContext { settings: Arc, schema: DataSchemaRef, scan_progress: Arc, + is_multi_part: bool, ) -> Result { let format_type = StageFileFormatType::from_str(format_name).map_err(ErrorCode::UnknownFormat)?; @@ -238,7 +239,10 @@ impl InputContext { } else { StageFileCompression::Auto }; - let plan = StreamPlan { compression }; + let plan = StreamPlan { + is_multi_part, + compression, + }; Ok(InputContext { format, @@ -294,7 +298,6 @@ impl InputContext { let opt = match &self.plan { InputPlan::CopyInto(p) => p.stage_info.file_format_options.compression, InputPlan::StreamingLoad(p) => p.compression, - _ => StageFileCompression::None, }; Self::get_compression_alg_copy(opt, path) } diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/input_pipeline.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/input_pipeline.rs index 2b24a0c92b3ca..28742a5e2bee0 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/input_pipeline.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/input_pipeline.rs @@ -33,6 +33,7 @@ use opendal::io_util::CompressAlgorithm; use crate::processors::sources::input_formats::input_context::InputContext; use crate::processors::sources::input_formats::input_context::InputPlan; +use crate::processors::sources::input_formats::input_context::StreamPlan; use crate::processors::sources::input_formats::input_format::SplitInfo; use crate::processors::sources::input_formats::source_aligner::Aligner; use crate::processors::sources::input_formats::source_deserializer::DeserializeSource; @@ -210,8 +211,13 @@ pub trait InputFormatPipe: Sized + Send + 'static { let n_threads = ctx.settings.get_max_threads()? as usize; let max_aligner = match ctx.plan { InputPlan::CopyInto(_) => ctx.splits.len(), - InputPlan::StreamingLoad(_) => 3, - InputPlan::Clickhouse => 1, + InputPlan::StreamingLoad(StreamPlan { is_multi_part, .. }) => { + if is_multi_part { + 3 + } else { + 1 + } + } }; let (row_batch_tx, row_batch_rx) = crossbeam_channel::bounded(n_threads); for _ in 0..std::cmp::min(max_aligner, n_threads) { From 63bdfc47eb7a4953d4a2ba1fc00fbf6698742f16 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Fri, 23 Sep 2022 12:03:50 +0800 Subject: [PATCH 04/24] refactor(input_format): rm InputPlan::Clickhouse. --- src/query/service/src/servers/http/v1/load.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/query/service/src/servers/http/v1/load.rs b/src/query/service/src/servers/http/v1/load.rs index 7bf8ebf50f68f..3511220f5799c 100644 --- a/src/query/service/src/servers/http/v1/load.rs +++ b/src/query/service/src/servers/http/v1/load.rs @@ -136,6 +136,7 @@ pub async fn streaming_load( context.get_settings(), schema, context.get_scan_progress(), + true, ) .await .map_err(InternalServerError)?, From 407024bef43c08800382b8688afb6ea94f262497 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Fri, 23 Sep 2022 12:06:37 +0800 Subject: [PATCH 05/24] refactor(clickhouse_handler): use new InputFormat. fmt insert.rs --- .../src/interpreters/interpreter_insert_v2.rs | 49 ++------ .../src/servers/http/clickhouse_handler.rs | 114 ++++++++++++++---- .../service/src/sql/planner/binder/insert.rs | 31 ++--- .../service/src/sql/planner/plans/insert.rs | 4 +- 4 files changed, 115 insertions(+), 83 deletions(-) diff --git a/src/query/service/src/interpreters/interpreter_insert_v2.rs b/src/query/service/src/interpreters/interpreter_insert_v2.rs index 05b29ddbeaa67..41353f87080dc 100644 --- a/src/query/service/src/interpreters/interpreter_insert_v2.rs +++ b/src/query/service/src/interpreters/interpreter_insert_v2.rs @@ -27,7 +27,6 @@ use common_datablocks::DataBlock; use common_datavalues::prelude::*; use common_exception::ErrorCode; use common_exception::Result; -use common_formats::FormatFactory; use common_formats::InputFormat; use common_io::prelude::BufferRead; use common_io::prelude::BufferReadExt; @@ -37,7 +36,6 @@ use common_io::prelude::NestedCheckpointReader; use common_pipeline_sources::processors::sources::AsyncSource; use common_pipeline_sources::processors::sources::AsyncSourcer; use common_pipeline_sources::processors::sources::SyncSource; -use common_pipeline_sources::processors::sources::SyncSourcer; use common_pipeline_transforms::processors::transforms::Transform; use common_planner::Metadata; use common_planner::MetadataRef; @@ -132,43 +130,20 @@ impl Interpreter for InsertInterpreterV2 { ); } else { match &self.plan.source { - InsertInputSource::StrWithFormat((str, format)) => { + InsertInputSource::Values(data) => { let output_port = OutputPort::create(); - let format_settings = self.ctx.get_format_settings()?; - match format.as_str() { - "VALUES" => { - let settings = self.ctx.get_settings(); - let name_resolution_ctx = - NameResolutionContext::try_from(settings.as_ref())?; - let inner = ValueSource::new( - str.to_string(), - self.ctx.clone(), - name_resolution_ctx, - plan.schema(), - ); - let source = - AsyncSourcer::create(self.ctx.clone(), output_port.clone(), inner)?; - builder.add_source(output_port, source); - } - - _ => { - let input_format = FormatFactory::instance().get_input( - format.as_str(), - plan.schema(), - format_settings, - )?; - let inner = FormatSource { - data: str.to_string(), - input_format, - blocks: vec![], - is_finished: false, - }; + let settings = self.ctx.get_settings(); + let name_resolution_ctx = NameResolutionContext::try_from(settings.as_ref())?; + let inner = ValueSource::new( + data.to_string(), + self.ctx.clone(), + name_resolution_ctx, + plan.schema(), + ); + let source = + AsyncSourcer::create(self.ctx.clone(), output_port.clone(), inner)?; + builder.add_source(output_port, source); - let source = - SyncSourcer::create(self.ctx.clone(), output_port.clone(), inner)?; - builder.add_source(output_port, source); - } - } build_res.main_pipeline.add_pipe(builder.finalize()); } InsertInputSource::StreamingWithFormat(_, _, input_context) => { diff --git a/src/query/service/src/servers/http/clickhouse_handler.rs b/src/query/service/src/servers/http/clickhouse_handler.rs index acd82f2574655..59acbbf8cfd21 100644 --- a/src/query/service/src/servers/http/clickhouse_handler.rs +++ b/src/query/service/src/servers/http/clickhouse_handler.rs @@ -17,16 +17,22 @@ use std::str::FromStr; use std::sync::Arc; use async_stream::stream; +use common_base::base::tokio; +use common_base::base::tokio::sync::mpsc::Sender; +use common_base::base::tokio::task::JoinHandle; +use common_base::base::TrySpawn; use common_datablocks::DataBlock; use common_datavalues::DataSchemaRef; use common_exception::ErrorCode; use common_exception::Result; use common_exception::ToErrorCode; use common_formats::output_format::OutputFormatType; -use common_streams::SendableDataBlockStream; +use common_pipeline_sources::processors::sources::input_formats::InputContext; +use common_pipeline_sources::processors::sources::input_formats::StreamingReadBatch; use futures::StreamExt; use http::HeaderMap; use naive_cityhash::cityhash128; +use opendal::io_util::CompressAlgorithm; use poem::error::BadRequest; use poem::error::InternalServerError; use poem::error::Result as PoemResult; @@ -41,19 +47,16 @@ use poem::IntoResponse; use poem::Route; use serde::Deserialize; use serde::Serialize; -use tracing::error; use tracing::info; use crate::interpreters::InterpreterFactory; use crate::interpreters::InterpreterPtr; -use crate::pipelines::processors::port::OutputPort; -use crate::pipelines::processors::StreamSource; -use crate::pipelines::SourcePipeBuilder; use crate::servers::http::v1::HttpQueryContext; use crate::servers::http::CLickHouseFederated; use crate::sessions::QueryContext; use crate::sessions::SessionType; use crate::sessions::TableContext; +use crate::sql::plans::InsertInputSource; use crate::sql::plans::Plan; use crate::sql::Planner; @@ -102,20 +105,10 @@ async fn execute( interpreter: InterpreterPtr, schema: DataSchemaRef, format: OutputFormatType, - input_stream: Option, params: StatementHandlerParams, + handle: Option>, ) -> Result> { - let mut data_stream: SendableDataBlockStream = { - let output_port = OutputPort::create(); - let stream_source = StreamSource::create(ctx.clone(), input_stream, output_port.clone())?; - let mut source_pipe_builder = SourcePipeBuilder::create(); - source_pipe_builder.add_source(output_port, stream_source); - let _ = interpreter - .set_source_pipe_builder(Option::from(source_pipe_builder)) - .map_err(|e| error!("interpreter.set_source_pipe_builder.error: {:?}", e)); - interpreter.execute(ctx.clone()).await? - }; - + let mut data_stream = interpreter.execute(ctx.clone()).await?; let format_setting = ctx.get_format_settings()?; let mut output_format = format.create_format(schema, format_setting); let prefix = Ok(output_format.serialize_prefix()?); @@ -164,10 +157,12 @@ async fn execute( if ok { yield compress_fn(output_format.finalize()); } - // to hold session ref until stream is all consumed let _ = session.get_id(); }; + if let Some(handle) = handle { + handle.await.expect("must") + } Ok(Body::from_bytes_stream(stream).with_content_type(format.get_content_type())) } @@ -208,7 +203,7 @@ pub async fn clickhouse_handler_get( let interpreter = InterpreterFactory::get(context.clone(), &plan) .await .map_err(BadRequest)?; - execute(context, interpreter, plan.schema(), format, None, params) + execute(context, interpreter, plan.schema(), format, params, None) .await .map_err(InternalServerError) } @@ -254,18 +249,55 @@ pub async fn clickhouse_handler_post( return serialize_one_block(ctx.clone(), block, &sql, ¶ms, default_format) .map_err(InternalServerError); } - let mut planner = Planner::new(ctx.clone()); - let (plan, _, fmt) = planner.plan_sql(&sql).await.map_err(BadRequest)?; + let (mut plan, _, fmt) = planner.plan_sql(&sql).await.map_err(BadRequest)?; + let schema = plan.schema(); + ctx.attach_query_str(plan.to_string(), &sql); + let mut handle = None; + if let Plan::Insert(insert) = &mut plan { + if let InsertInputSource::StreamingWithFormat(format, start, input_context_ref) = + &mut insert.source + { + let (tx, rx) = tokio::sync::mpsc::channel(2); + let input_context = Arc::new( + InputContext::try_create_from_insert( + format.as_str(), + rx, + ctx.get_settings(), + schema, + ctx.get_scan_progress(), + false, + ) + .await + .map_err(InternalServerError)?, + ); + *input_context_ref = Some(input_context.clone()); + info!( + "clickhouse insert with format {:?}, value {}", + input_context, *start + ); + let compression_alg = input_context.get_compression_alg("").map_err(BadRequest)?; + let start = *start; + handle = Some(ctx.spawn(async move { + gen_batches( + sql, + start, + input_context.read_batch_size, + tx, + compression_alg, + ) + .await + })); + } + }; let format = get_format_with_default(fmt, default_format)?; let format = get_format_from_plan(&plan, format)?; - ctx.attach_query_str(plan.to_string(), &sql); let interpreter = InterpreterFactory::get(ctx.clone(), &plan) .await .map_err(BadRequest)?; - execute(ctx, interpreter, plan.schema(), format, None, params) + execute(ctx, interpreter, plan.schema(), format, params, handle) .await .map_err(InternalServerError) } @@ -383,3 +415,39 @@ fn get_format_with_default( Some(name) => OutputFormatType::from_str(&name).map_err(BadRequest), } } + +async fn gen_batches( + data: String, + start: usize, + batch_size: usize, + tx: Sender>, + compression: Option, +) { + let buf = &data.as_bytes()[start..]; + let buf_size = buf.len(); + let mut is_start = true; + let mut start = 0; + let path = "clickhouse_handler_body".to_string(); + while start < buf_size { + let data = if buf_size - start >= batch_size { + buf[start..batch_size].to_vec() + } else { + buf[start..].to_vec() + }; + + tracing::debug!("sending read {} bytes", data.len()); + if let Err(e) = tx + .send(Ok(StreamingReadBatch { + data, + path: path.clone(), + is_start, + compression, + })) + .await + { + tracing::warn!(" Multipart fail to send ReadBatch: {}", e); + } + is_start = false; + start += batch_size + } +} diff --git a/src/query/service/src/sql/planner/binder/insert.rs b/src/query/service/src/sql/planner/binder/insert.rs index 4db4b296d98a8..df58b962b71d6 100644 --- a/src/query/service/src/sql/planner/binder/insert.rs +++ b/src/query/service/src/sql/planner/binder/insert.rs @@ -19,7 +19,6 @@ use common_ast::ast::InsertStmt; use common_ast::ast::Statement; use common_datavalues::DataSchemaRefExt; use common_exception::Result; -use tracing::debug; use crate::sql::binder::Binder; use crate::sql::normalize_identifier; @@ -82,11 +81,17 @@ impl<'a> Binder { format, rest_str, start, - } => Ok(InsertInputSource::StreamingWithFormat(format, start, None)), + } => { + if format.to_uppercase() == "VALUES" { + let data = rest_str.trim_end_matches(';').trim_start().to_owned(); + Ok(InsertInputSource::Values(data)) + } else { + Ok(InsertInputSource::StreamingWithFormat(format, start, None)) + } + } InsertSource::Values { rest_str } => { - let str = rest_str.trim_end_matches(';'); - self.analyze_stream_format(str, Some("VALUES".to_string())) - .await + let data = rest_str.trim_end_matches(';').trim_start().to_owned(); + Ok(InsertInputSource::Values(data)) } InsertSource::Select { query } => { let statement = Statement::Query(query); @@ -111,20 +116,4 @@ impl<'a> Binder { Ok(Plan::Insert(Box::new(plan))) } - - pub(in crate::sql::planner::binder) async fn analyze_stream_format( - &self, - stream_str: &'a str, - format: Option, - ) -> Result { - let stream_str = stream_str.trim_start(); - debug!("{:?}", stream_str); - let format = format.map(|v| v.to_uppercase()); - // TODO migrate format into format factory and avoid the str clone - let data = stream_str.to_owned(); - Ok(InsertInputSource::StrWithFormat(( - data, - format.unwrap_or_else(|| "VALUES".to_string()), - ))) - } } diff --git a/src/query/service/src/sql/planner/plans/insert.rs b/src/query/service/src/sql/planner/plans/insert.rs index 43db0fb548778..7cd038ab8822f 100644 --- a/src/query/service/src/sql/planner/plans/insert.rs +++ b/src/query/service/src/sql/planner/plans/insert.rs @@ -29,7 +29,7 @@ pub enum InsertInputSource { #[serde(skip)] StreamingWithFormat(String, usize, Option>), // From cloned String and format - StrWithFormat((String, String)), + Values(String), } #[derive(serde::Serialize, serde::Deserialize, Clone)] @@ -71,7 +71,7 @@ impl Insert { match &self.source { InsertInputSource::SelectPlan(_) => None, InsertInputSource::StreamingWithFormat(v, ..) => Some(v.as_str()), - InsertInputSource::StrWithFormat((_, v)) => Some(v.as_str()), + InsertInputSource::Values(v) => Some(v.as_str()), } } } From de2a7191d0b31bef5d238d50210afbe57837afc7 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Fri, 23 Sep 2022 15:16:00 +0800 Subject: [PATCH 06/24] feat(format): accept TapSeparated as alias for TSV. --- src/meta/types/src/user_stage.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/meta/types/src/user_stage.rs b/src/meta/types/src/user_stage.rs index 500a713973f86..b2ebcfe3567f9 100644 --- a/src/meta/types/src/user_stage.rs +++ b/src/meta/types/src/user_stage.rs @@ -135,17 +135,16 @@ impl FromStr for StageFileFormatType { fn from_str(s: &str) -> std::result::Result { match s.to_uppercase().as_str() { "CSV" => Ok(StageFileFormatType::Csv), - "TSV" => Ok(StageFileFormatType::Tsv), + "TSV" | "TABSEPARATED" => Ok(StageFileFormatType::Tsv), "JSON" => Ok(StageFileFormatType::Json), "NDJSON" => Ok(StageFileFormatType::NdJson), "AVRO" => Ok(StageFileFormatType::Avro), "ORC" => Ok(StageFileFormatType::Orc), "PARQUET" => Ok(StageFileFormatType::Parquet), "XML" => Ok(StageFileFormatType::Xml), - _ => Err( - "Unknown file format type, must one of { CSV | JSON | AVRO | ORC | PARQUET | XML }" - .to_string(), - ), + _ => Err(format!( + "Unknown file format type '{s}', must one of ( CSV | TSV | JSON | AVRO | ORC | PARQUET | XML)" + )), } } } From 7557e73119699bf26e09af4a4e4f1ce4c6d99f2b Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Fri, 23 Sep 2022 15:17:27 +0800 Subject: [PATCH 07/24] fix(format): fix cases when tsv lack columns. --- .../sources/input_formats/impls/input_format_tsv.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_tsv.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_tsv.rs index b63d737c63ad1..77ca131cb3988 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_tsv.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_tsv.rs @@ -76,12 +76,12 @@ impl InputFormatTSV { } pos += 1; } - if column_index < num_columns - 1 { + if column_index < num_columns { // todo(youngsofun): allow it optionally (set default) err_msg = Some(format!( "need {} columns, find {} only", num_columns, - column_index + 1 + column_index )); } if let Some(m) = err_msg { From a96f4270d207f10d5a43f07548993c202195fb41 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Fri, 23 Sep 2022 15:18:43 +0800 Subject: [PATCH 08/24] feat(format): accept clickhouse formats suffix WithNamesAndTypes. --- .../sources/input_formats/input_context.rs | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/input_context.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/input_context.rs index 63fe523654afe..207464c5f089d 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/input_context.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/input_context.rs @@ -217,6 +217,9 @@ impl InputContext { scan_progress: Arc, is_multi_part: bool, ) -> Result { + let (format_name, rows_to_skip) = remove_clickhouse_format_suffix(format_name); + let rows_to_skip = std::cmp::max(settings.get_skip_header()? as usize, rows_to_skip); + let format_type = StageFileFormatType::from_str(format_name).map_err(ErrorCode::UnknownFormat)?; let format = Self::get_input_format(&format_type)?; @@ -232,7 +235,6 @@ impl InputContext { } }; let record_delimiter = RecordDelimiter::try_from(&settings.get_record_delimiter()?[..])?; - let rows_to_skip = settings.get_skip_header()? as usize; let compression = settings.get_compression()?; let compression = if !compression.is_empty() { StageFileCompression::from_str(&compression).map_err(ErrorCode::BadArguments)? @@ -328,3 +330,18 @@ impl InputContext { Ok(compression_algo) } } + +const WITH_NAMES_AND_TYPES: &str = "withnamesandtypes"; +const WITH_NAMES: &str = "withnames"; + +fn remove_clickhouse_format_suffix(name: &str) -> (&str, usize) { + let s = name.to_lowercase(); + let (suf_len, skip) = if s.ends_with(WITH_NAMES_AND_TYPES) { + (WITH_NAMES_AND_TYPES.len(), 2) + } else if s.ends_with(WITH_NAMES) { + (WITH_NAMES.len(), 1) + } else { + (0, 0) + }; + (&name[0..(s.len() - suf_len)], skip) +} From ab4242a4c7881c807bee11afe88c256ff54ad4de Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Fri, 23 Sep 2022 15:25:33 +0800 Subject: [PATCH 09/24] fix(test): csv use double quote by default. select without float sum. --- ...4_0004_http_clickhouse_input_format.result | 1 + .../14_0004_http_clickhouse_input_format.sh | 30 ++++++++++++------- 2 files changed, 20 insertions(+), 11 deletions(-) create mode 100755 tests/suites/0_stateless/14_clickhouse_http_handler/14_0004_http_clickhouse_input_format.result diff --git a/tests/suites/0_stateless/14_clickhouse_http_handler/14_0004_http_clickhouse_input_format.result b/tests/suites/0_stateless/14_clickhouse_http_handler/14_0004_http_clickhouse_input_format.result new file mode 100755 index 0000000000000..988c0caae694a --- /dev/null +++ b/tests/suites/0_stateless/14_clickhouse_http_handler/14_0004_http_clickhouse_input_format.result @@ -0,0 +1 @@ +10017 3300 2 diff --git a/tests/suites/0_stateless/14_clickhouse_http_handler/14_0004_http_clickhouse_input_format.sh b/tests/suites/0_stateless/14_clickhouse_http_handler/14_0004_http_clickhouse_input_format.sh index 43e75addec978..00829c585119c 100755 --- a/tests/suites/0_stateless/14_clickhouse_http_handler/14_0004_http_clickhouse_input_format.sh +++ b/tests/suites/0_stateless/14_clickhouse_http_handler/14_0004_http_clickhouse_input_format.sh @@ -5,37 +5,43 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) cat << EOF > /tmp/databend_test_csv.txt -insert into a(a,b,c) format CSV 100,'2',100.3 -200,'3',200.4 -300,'2',300 +insert into a(a,b,c) format CSV 100,"2",100.3 +200,"3",200.4 +300,"2",300 EOF cat << EOF > /tmp/databend_test_csv_names.txt insert into a(a,b,c) format CSVWithNames a,b,c -100,'2',100.3 -200,'3',200.4 -300,'2',300 +100,"2",100.3 +200,"3",200.4 +300,"2",300 EOF cat << EOF > /tmp/databend_test_csv_names_and_types.txt insert into a(a,b,c) format CSVWithNamesAndTypes a,b,c -'int','varchar','double' -100,'2',100.3 -200,'3',200.4 -300,'2',300 +"int","varchar","double" +100,"2",100.3 +200,"3",200.4 +300,"2",300 EOF +cat << EOF > /tmp/databend_test_tsv.txt +insert into a(a,b,c) format TabSeparatedWithNames xx +100 2 100.3 +200 3 200.4 +300 2 300 +EOF + cat << EOF > /tmp/databend_test_tsv_names_and_types.txt insert into a(a,b,c) format TabSeparatedWithNamesAndTypes a b c 'int' 'varchar' 'double' 100 2 100.3 200 3 200.4 300 2 300 - EOF @@ -56,6 +62,7 @@ for i in `seq 1 10000`;do echo '{"a": 0, "b": "3", "c": 0}' >> /tmp/databend_test_ndjson.txt done +curl -s -u 'root:' -XPOST "http://localhost:${QUERY_CLICKHOUSE_HTTP_HANDLER_PORT}" -d "drop table if exists a" curl -s -u 'root:' -XPOST "http://localhost:${QUERY_CLICKHOUSE_HTTP_HANDLER_PORT}" -d "create table a ( a int, b varchar, c double)" @@ -69,6 +76,7 @@ curl -s -u 'root:' -XPOST "http://localhost:${QUERY_CLICKHOUSE_HTTP_HANDLER_POR # Flaky test: wait for https://github.com/datafuselabs/databend/issues/7657 #curl -s -u 'root:' -XPOST "http://localhost:${QUERY_CLICKHOUSE_HTTP_HANDLER_PORT}" -d "SELECT count(), sum(a), min(b), sum(c) from a" +curl -s -u 'root:' -XPOST "http://localhost:${QUERY_CLICKHOUSE_HTTP_HANDLER_PORT}" -d "SELECT count(), sum(a), min(b) from a" curl -s -u 'root:' -XPOST "http://localhost:${QUERY_CLICKHOUSE_HTTP_HANDLER_PORT}" -d "drop table a" rm /tmp/databend_test*.txt From 1d92b6b44aea6d66dc2d65f886f4a2582a61337d Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Fri, 23 Sep 2022 17:17:52 +0800 Subject: [PATCH 10/24] fix(clickhouse_handler): trim start of insert data. --- src/query/service/src/servers/http/clickhouse_handler.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/service/src/servers/http/clickhouse_handler.rs b/src/query/service/src/servers/http/clickhouse_handler.rs index 59acbbf8cfd21..61c11f0df4792 100644 --- a/src/query/service/src/servers/http/clickhouse_handler.rs +++ b/src/query/service/src/servers/http/clickhouse_handler.rs @@ -423,7 +423,7 @@ async fn gen_batches( tx: Sender>, compression: Option, ) { - let buf = &data.as_bytes()[start..]; + let buf = &data.trim_start().as_bytes()[start..]; let buf_size = buf.len(); let mut is_start = true; let mut start = 0; From 604391ca6f74865c362115533abbb02820957a60 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Fri, 23 Sep 2022 17:20:32 +0800 Subject: [PATCH 11/24] feat(csv): CSV allow ending with ',', but should not have data after it --- .../input_formats/impls/input_format_csv.rs | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_csv.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_csv.rs index 3ce1229a2f7f5..74299996e9454 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_csv.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_csv.rs @@ -178,6 +178,16 @@ impl InputFormatTextBase for InputFormatCSV { &state.path, state.rows, )); + } else if endlen == num_fields + 1 { + if field_ends[num_fields] != field_ends[num_fields - 1] { + return Err(csv_error( + &format!( + "CSV allow ending with ',', but should not have data after it" + ), + &state.path, + state.rows, + )); + } } state.rows_to_skip -= 1; @@ -250,6 +260,16 @@ impl InputFormatTextBase for InputFormatCSV { &state.path, start_row + row_batch.row_ends.len(), )); + } else if endlen == num_fields + 1 { + if field_ends[num_fields] != field_ends[num_fields - 1] { + return Err(csv_error( + &format!( + "CSV allow ending with ',', but should not have data after it" + ), + &state.path, + start_row + row_batch.row_ends.len(), + )); + } } row_batch .field_ends From 08f392803be4cb8cf65af6ccc749fcca0109bc6e Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Fri, 23 Sep 2022 17:22:57 +0800 Subject: [PATCH 12/24] fix(tsv): wrong err msg. --- .../sources/input_formats/impls/input_format_tsv.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_tsv.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_tsv.rs index 77ca131cb3988..9ce04c555e16f 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_tsv.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_tsv.rs @@ -76,14 +76,14 @@ impl InputFormatTSV { } pos += 1; } - if column_index < num_columns { + if err_msg.is_none() && column_index < num_columns { // todo(youngsofun): allow it optionally (set default) err_msg = Some(format!( "need {} columns, find {} only", - num_columns, - column_index + num_columns, column_index )); } + if let Some(m) = err_msg { let row_info = if let Some(r) = row_index { format!("at row {},", r) From 4e8b20f9dc128f7f78566e5bd17e3435278bc564 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Fri, 23 Sep 2022 17:23:42 +0800 Subject: [PATCH 13/24] feat(format): add format_column_error() --- .../input_formats/impls/input_format_csv.rs | 1 + .../input_formats/impls/input_format_tsv.rs | 14 ++++++++++---- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_csv.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_csv.rs index 74299996e9454..c4003acb310d9 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_csv.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_csv.rs @@ -26,6 +26,7 @@ use common_settings::Settings; use csv_core::ReadRecordResult; use crate::processors::sources::input_formats::delimiter::RecordDelimiter; +use crate::processors::sources::input_formats::impls::input_format_tsv::format_column_error; use crate::processors::sources::input_formats::input_format_text::get_time_zone; use crate::processors::sources::input_formats::input_format_text::AligningState; use crate::processors::sources::input_formats::input_format_text::BlockBuilder; diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_tsv.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_tsv.rs index 9ce04c555e16f..a2a6d0681f619 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_tsv.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_tsv.rs @@ -59,10 +59,7 @@ impl InputFormatTSV { if let Err(e) = deserializers[column_index].de_text(&mut reader, format_settings) { - err_msg = Some(format!( - "fail to decode column {}: {:?}, [column_data]=[{}]", - column_index, e, "" - )); + err_msg = Some(format_column_error(column_index, col_data, &e.message())); break; }; // todo(youngsofun): check remaining data @@ -158,3 +155,12 @@ impl InputFormatTextBase for InputFormatTSV { Ok(state.align_by_record_delimiter(buf)) } } + +pub fn format_column_error(column_index: usize, col_data: &[u8], msg: &str) -> String { + let mut data = String::new(); + verbose_string(col_data, &mut data); + format!( + "fail to decode column {}: {}, [column_data]=[{}]", + column_index, msg, data + ) +} From 3303bea5e4927c3d94775491490f5e3f2f55ea97 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Fri, 23 Sep 2022 17:24:15 +0800 Subject: [PATCH 14/24] feat(csv): check ending of field. --- .../input_formats/impls/input_format_csv.rs | 52 ++++++++----------- 1 file changed, 23 insertions(+), 29 deletions(-) diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_csv.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_csv.rs index c4003acb310d9..ee673b86e1bd0 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_csv.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_csv.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use common_datavalues::TypeDeserializer; use common_exception::ErrorCode; use common_exception::Result; -use common_formats::verbose_string; +use common_io::prelude::BufferReadExt; use common_io::prelude::FormatSettings; use common_io::prelude::NestedCheckpointReader; use common_meta_types::StageFileFormatType; @@ -56,16 +56,14 @@ impl InputFormatCSV { // reader.ignores(|c: u8| c == b' ').expect("must success"); // todo(youngsofun): do not need escape, already done in csv-core if let Err(e) = deserializer.de_text(&mut reader, format_settings) { - let mut value = String::new(); - verbose_string(buf, &mut value); - let err_msg = format!( - "fail to decode column {}: {:?}, [column_data]=[{}]", - c, - e.message(), - value - ); + let err_msg = format_column_error(c, col_data, &e.message()); return Err(csv_error(&err_msg, path, row_index)); }; + reader.ignore_white_spaces().expect("must success"); + if reader.must_eof().is_err() { + let err_msg = format_column_error(c, col_data, "bad field end"); + return Err(csv_error(&err_msg, path, row_index)); + } } field_start = field_end; } @@ -179,16 +177,14 @@ impl InputFormatTextBase for InputFormatCSV { &state.path, state.rows, )); - } else if endlen == num_fields + 1 { - if field_ends[num_fields] != field_ends[num_fields - 1] { - return Err(csv_error( - &format!( - "CSV allow ending with ',', but should not have data after it" - ), - &state.path, - state.rows, - )); - } + } else if endlen == num_fields + 1 + && field_ends[num_fields] != field_ends[num_fields - 1] + { + return Err(csv_error( + "CSV allow ending with ',', but should not have data after it", + &state.path, + state.rows, + )); } state.rows_to_skip -= 1; @@ -261,16 +257,14 @@ impl InputFormatTextBase for InputFormatCSV { &state.path, start_row + row_batch.row_ends.len(), )); - } else if endlen == num_fields + 1 { - if field_ends[num_fields] != field_ends[num_fields - 1] { - return Err(csv_error( - &format!( - "CSV allow ending with ',', but should not have data after it" - ), - &state.path, - start_row + row_batch.row_ends.len(), - )); - } + } else if endlen == num_fields + 1 + && field_ends[num_fields] != field_ends[num_fields - 1] + { + return Err(csv_error( + "CSV allow ending with ',', but should not have data after it", + &state.path, + start_row + row_batch.row_ends.len(), + )); } row_batch .field_ends From 279d630a027f25a89dccccddf024332cd2a69fd0 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Fri, 23 Sep 2022 17:24:26 +0800 Subject: [PATCH 15/24] feat(tsv): check ending of field. tsv end --- .../sources/input_formats/impls/input_format_tsv.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_tsv.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_tsv.rs index a2a6d0681f619..d252292c59b66 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_tsv.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_tsv.rs @@ -62,7 +62,12 @@ impl InputFormatTSV { err_msg = Some(format_column_error(column_index, col_data, &e.message())); break; }; - // todo(youngsofun): check remaining data + reader.ignore_white_spaces().expect("must success"); + if reader.must_eof().is_err() { + err_msg = + Some(format_column_error(column_index, col_data, "bad field end")); + break; + } } column_index += 1; field_start = pos + 1; From fc00a5180b36bc646c1595d15f025dcdc01e9724 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Fri, 23 Sep 2022 17:25:35 +0800 Subject: [PATCH 16/24] test(format): update 14_0007_http_clickhouse_input_format_diagnostic.sh. --- ..._clickhouse_input_format_diagnostic.result | 50 +++++------------ ...http_clickhouse_input_format_diagnostic.sh | 54 ++++++++++--------- 2 files changed, 40 insertions(+), 64 deletions(-) diff --git a/tests/suites/0_stateless/14_clickhouse_http_handler/14_0007_http_clickhouse_input_format_diagnostic.result b/tests/suites/0_stateless/14_clickhouse_http_handler/14_0007_http_clickhouse_input_format_diagnostic.result index 23ddb3a453145..25ec058b73e93 100644 --- a/tests/suites/0_stateless/14_clickhouse_http_handler/14_0007_http_clickhouse_input_format_diagnostic.result +++ b/tests/suites/0_stateless/14_clickhouse_http_handler/14_0007_http_clickhouse_input_format_diagnostic.result @@ -1,43 +1,17 @@ -Code: 4000, displayText = invalid data (Expected to have char '.) -Error occurs at row: 1: - Column: 0, Name: a, Type: Timestamp, Parsed text: "19892-02-0" - ERROR: DateTime must be in YYYY-MM-DD hh:mm:ss format. - (while in processor thread 0). +csv 1 +1 -Code: 1046, displayText = Parse csv error at line 0 -Error occurs at row: 0: - Column: 0, Name: a, Type: Timestamp, Parsed text: "2023-04-08 01:01:01" - Column: 1, Name: b, Type: String, Parsed text: "Hello" - Column: 2, Name: c, Type: Int32, Parsed text: "12345678" - Error: There is no line feed. "," found instead. - (while in processor thread 0). +csv 2 +1 -Code: 1046, displayText = Parse csv error at line 0 -Error occurs at row: 0: - Column: 0, Name: a, Type: Timestamp, Parsed text: "2023-04-08 01:01:01" - Column: 1, Name: b, Type: String, Parsed text: "" - Column: 2, Name: c, Type: Int32, Parsed text: "123" - Error: There is no line feed. "H" found instead. - (while in processor thread 0). +csv 3 +1 -Code: 1046, displayText = Cannot parse value:'2023-04-0 to Date type, cause: input contains invalid characters -Error occurs at row: 0: - Column: 0, Name: a, Type: Timestamp, Parsed text: "2023-04-0" - ERROR: DateTime must be in YYYY-MM-DD hh:mm:ss format. - (while in processor thread 0). +tsv 1 +1 -Code: 1046, displayText = Parse Tsv error at line 1 -Error occurs at row: 1: - Column: 0, Name: a, Type: Timestamp, Parsed text: "1989-02-03 15:23:23" - Column: 1, Name: b, Type: String, Parsed text: "World" - Column: 2, Name: c, Type: Int32, Parsed text: "123456" - Error: There is no line feed. "1" found instead. - (while in processor thread 0). +tsv 2 +1 -Code: 1046, displayText = Parse Tsv error at line 0 -Error occurs at row: 0: - Column: 0, Name: a, Type: Timestamp, Parsed text: "2023-04-08 01:01:01" - Column: 1, Name: b, Type: String, Parsed text: "" - Column: 2, Name: c, Type: Int32, Parsed text: "123" - Error: There is no line feed. "H" found instead. - (while in processor thread 0). \ No newline at end of file +tsv 3 +1 diff --git a/tests/suites/0_stateless/14_clickhouse_http_handler/14_0007_http_clickhouse_input_format_diagnostic.sh b/tests/suites/0_stateless/14_clickhouse_http_handler/14_0007_http_clickhouse_input_format_diagnostic.sh index cf96c44cbc3fe..4c72bd9b0095f 100755 --- a/tests/suites/0_stateless/14_clickhouse_http_handler/14_0007_http_clickhouse_input_format_diagnostic.sh +++ b/tests/suites/0_stateless/14_clickhouse_http_handler/14_0007_http_clickhouse_input_format_diagnostic.sh @@ -3,58 +3,60 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) . "$CURDIR"/../../../shell_env.sh +curl -s -u 'root:' -XPOST "http://localhost:${QUERY_CLICKHOUSE_HTTP_HANDLER_PORT}/?enable_planner_v2=1" -d "drop table if exists a" +curl -s -u 'root:' -XPOST "http://localhost:${QUERY_CLICKHOUSE_HTTP_HANDLER_PORT}/?enable_planner_v2=1" -d "create table a ( a datetime, b string, c int)" + +# 1 bad date +echo -e 'csv 1' cat << EOF > /tmp/databend_test_csv_error1.txt insert into a(a,b,c) format CSV -'2023-04-08 01:01:01', "Hello", 12345678 -'19892-02-03 15:23:23', "World", 123456 - +"2023-04-08 01:01:01","Hello",12345678 +"19892-02-03 15:23:23","World",123456 EOF +curl -s -u 'root:' -XPOST "http://localhost:${QUERY_CLICKHOUSE_HTTP_HANDLER_PORT}/?enable_planner_v2=1" --data-binary @/tmp/databend_test_csv_error1.txt | grep -c "Date" +# 2 one more column +echo -e '\ncsv 2' cat << EOF > /tmp/databend_test_csv_error2.txt -insert into a(a,b,c) format CSV '2023-04-08 01:01:01', "Hello", 12345678,1 +insert into a(a,b,c) format CSV "2023-04-08 01:01:01", "Hello",12345678,1 EOF +curl -s -u 'root:' -XPOST "http://localhost:${QUERY_CLICKHOUSE_HTTP_HANDLER_PORT}/?enable_planner_v2=1" --data-binary @/tmp/databend_test_csv_error2.txt | grep -c "allow ending" +# 3 bad number +echo -e '\ncsv 3' cat << EOF > /tmp/databend_test_csv_error3.txt -insert into a(a,b,c) format CSV '2023-04-08 01:01:01',,123Hello +insert into a(a,b,c) format CSV "2023-04-08 01:01:01",,123Hello EOF +curl -s -u 'root:' -XPOST "http://localhost:${QUERY_CLICKHOUSE_HTTP_HANDLER_PORT}/?enable_planner_v2=1" --data-binary @/tmp/databend_test_csv_error3.txt | grep -c "column 2: bad field end" +# 1 bad date +echo -e '\ntsv 1' cat << EOF > /tmp/databend_test_tsv_error1.txt insert into a(a,b,c) format TSV -'2023-04-08 01:01:01' "Hello" 12345678 +"2023-04-08 01:01:01" "Hello" 12345678 1989-023-03 15:23:23 "World" 123456 - EOF +curl -s -u 'root:' -XPOST "http://localhost:${QUERY_CLICKHOUSE_HTTP_HANDLER_PORT}/?enable_planner_v2=1" --data-binary @/tmp/databend_test_tsv_error1.txt | grep -c "Date" +# 2 one more column +echo -e '\ntsv 2' cat << EOF > /tmp/databend_test_tsv_error2.txt insert into a(a,b,c) format TSV 2023-04-08 01:01:01 "Hello" 12345678 1989-02-03 15:23:23 "World" 123456 1 - EOF +curl -s -u 'root:' -XPOST "http://localhost:${QUERY_CLICKHOUSE_HTTP_HANDLER_PORT}/?enable_planner_v2=1" --data-binary @/tmp/databend_test_tsv_error2.txt | grep -c "column 2: bad field end" +echo -e '\ntsv 3' +# 3 bad number cat << EOF > /tmp/databend_test_tsv_error3.txt insert into a(a,b,c) format TSV 2023-04-08 01:01:01 123Hello - EOF +curl -s -u 'root:' -XPOST "http://localhost:${QUERY_CLICKHOUSE_HTTP_HANDLER_PORT}/?enable_planner_v2=1" --data-binary @/tmp/databend_test_tsv_error3.txt | grep -c "column 2: bad field end" -curl -s -u 'root:' -XPOST "http://localhost:${QUERY_CLICKHOUSE_HTTP_HANDLER_PORT}/?enable_planner_v2=1" -d "create table a ( a datetime, b string, c int)" - -curl -s -u 'root:' -XPOST "http://localhost:${QUERY_CLICKHOUSE_HTTP_HANDLER_PORT}/?enable_planner_v2=1" --data-binary @/tmp/databend_test_csv_error1.txt -echo -e '\n' -curl -s -u 'root:' -XPOST "http://localhost:${QUERY_CLICKHOUSE_HTTP_HANDLER_PORT}/?enable_planner_v2=1" --data-binary @/tmp/databend_test_csv_error2.txt -echo -e '\n' -curl -s -u 'root:' -XPOST "http://localhost:${QUERY_CLICKHOUSE_HTTP_HANDLER_PORT}/?enable_planner_v2=1" --data-binary @/tmp/databend_test_csv_error3.txt -echo -e '\n' - -curl -s -u 'root:' -XPOST "http://localhost:${QUERY_CLICKHOUSE_HTTP_HANDLER_PORT}/?enable_planner_v2=1" --data-binary @/tmp/databend_test_tsv_error1.txt -echo -e '\n' -curl -s -u 'root:' -XPOST "http://localhost:${QUERY_CLICKHOUSE_HTTP_HANDLER_PORT}/?enable_planner_v2=1" --data-binary @/tmp/databend_test_tsv_error2.txt -echo -e '\n' -curl -s -u 'root:' -XPOST "http://localhost:${QUERY_CLICKHOUSE_HTTP_HANDLER_PORT}/?enable_planner_v2=1" --data-binary @/tmp/databend_test_tsv_error3.txt - +# cleanup curl -s -u 'root:' -XPOST "http://localhost:${QUERY_CLICKHOUSE_HTTP_HANDLER_PORT}/?enable_planner_v2=1" -d "drop table a" - -rm /tmp/databend_test*.txt +# rm /tmp/databend_test*.txt From 007365f1a0a485fcac15ecff77308b2bdeafeb70 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Fri, 23 Sep 2022 18:25:52 +0800 Subject: [PATCH 17/24] update statement test result --- src/query/ast/tests/it/testdata/statement.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/src/query/ast/tests/it/testdata/statement.txt b/src/query/ast/tests/it/testdata/statement.txt index 25c91e83ee74c..1dd1c8075b5a1 100644 --- a/src/query/ast/tests/it/testdata/statement.txt +++ b/src/query/ast/tests/it/testdata/statement.txt @@ -4360,6 +4360,7 @@ Insert( source: Streaming { format: "json", rest_str: ";", + start: 31, }, overwrite: false, }, From 1b176453ff6b217ec10bf65b030f16fed33c16f9 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Fri, 23 Sep 2022 19:14:28 +0800 Subject: [PATCH 18/24] test(format): update unit test for error message --- src/query/service/tests/it/servers/http/clickhouse_handler.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/service/tests/it/servers/http/clickhouse_handler.rs b/src/query/service/tests/it/servers/http/clickhouse_handler.rs index 2aa2585316c46..62c7b91c75423 100644 --- a/src/query/service/tests/it/servers/http/clickhouse_handler.rs +++ b/src/query/service/tests/it/servers/http/clickhouse_handler.rs @@ -271,7 +271,7 @@ async fn test_insert_format_ndjson() -> PoemResult<()> { .post("insert into table t1 format JSONEachRow", &body) .await; assert_eq!(status, StatusCode::INTERNAL_SERVER_ERROR); - assert_error!(body, "column a"); + assert_error!(body, "column=a"); } Ok(()) } From 9bc813e19712dc36ba8255907fa29568eada6340 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Fri, 23 Sep 2022 19:15:01 +0800 Subject: [PATCH 19/24] feat(CSV): trim leading spaces. --- src/meta/types/src/user_stage.rs | 2 +- .../sources/input_formats/impls/input_format_csv.rs | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/meta/types/src/user_stage.rs b/src/meta/types/src/user_stage.rs index b2ebcfe3567f9..657b0f6c6eaa0 100644 --- a/src/meta/types/src/user_stage.rs +++ b/src/meta/types/src/user_stage.rs @@ -137,7 +137,7 @@ impl FromStr for StageFileFormatType { "CSV" => Ok(StageFileFormatType::Csv), "TSV" | "TABSEPARATED" => Ok(StageFileFormatType::Tsv), "JSON" => Ok(StageFileFormatType::Json), - "NDJSON" => Ok(StageFileFormatType::NdJson), + "NDJSON" | "JSONEACHROW" => Ok(StageFileFormatType::NdJson), "AVRO" => Ok(StageFileFormatType::Avro), "ORC" => Ok(StageFileFormatType::Orc), "PARQUET" => Ok(StageFileFormatType::Parquet), diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_csv.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_csv.rs index ee673b86e1bd0..093f142671222 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_csv.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_csv.rs @@ -49,11 +49,11 @@ impl InputFormatCSV { for (c, deserializer) in deserializers.iter_mut().enumerate() { let field_end = field_ends[c]; let col_data = &buf[field_start..field_end]; - if col_data.is_empty() { + let mut reader = NestedCheckpointReader::new(col_data); + reader.ignore_white_spaces().expect("must success"); + if reader.eof().expect("must success") { deserializer.de_default(format_settings); } else { - let mut reader = NestedCheckpointReader::new(col_data); - // reader.ignores(|c: u8| c == b' ').expect("must success"); // todo(youngsofun): do not need escape, already done in csv-core if let Err(e) = deserializer.de_text(&mut reader, format_settings) { let err_msg = format_column_error(c, col_data, &e.message()); From 895bedfff6bcb3ab4355587497e195f09b98bce4 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Fri, 23 Sep 2022 19:33:03 +0800 Subject: [PATCH 20/24] test(CSV): update explain.test for typo. --- tests/logictest/suites/mode/standalone/explain/explain.test | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/logictest/suites/mode/standalone/explain/explain.test b/tests/logictest/suites/mode/standalone/explain/explain.test index 12fd9425fecd0..902d2a964150e 100644 --- a/tests/logictest/suites/mode/standalone/explain/explain.test +++ b/tests/logictest/suites/mode/standalone/explain/explain.test @@ -381,7 +381,7 @@ Insert (children 3) │ ├── Identifier a │ └── Identifier b └── Source (children 1) - └── ValueSouce + └── ValueSource statement query T explain ast delete from t1 where a > 100 and b > 1 and b < 10; From 4b79cec979e9b66479d7bdbb10214d619f863880 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Fri, 23 Sep 2022 19:34:16 +0800 Subject: [PATCH 21/24] test(format): up format_diagnostic.sh, match result with key word. --- .../0_stateless/20+_others/20_0009_format_diagnostic.result | 2 +- .../suites/0_stateless/20+_others/20_0009_format_diagnostic.sh | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/suites/0_stateless/20+_others/20_0009_format_diagnostic.result b/tests/suites/0_stateless/20+_others/20_0009_format_diagnostic.result index c49be78cbee8b..d00491fd7e5bb 100644 --- a/tests/suites/0_stateless/20+_others/20_0009_format_diagnostic.result +++ b/tests/suites/0_stateless/20+_others/20_0009_format_diagnostic.result @@ -1 +1 @@ -execute fail: fail to parse CSV databend_test_csv2.txt:2 fail to decode column 0: "Cannot parse value:2023-023-0 to Date type, cause: input contains invalid characters", [column_data]=["2023-023-02 02:03:02 World 123"] \ No newline at end of file +1 diff --git a/tests/suites/0_stateless/20+_others/20_0009_format_diagnostic.sh b/tests/suites/0_stateless/20+_others/20_0009_format_diagnostic.sh index 0d505a293228a..2cd705749e761 100755 --- a/tests/suites/0_stateless/20+_others/20_0009_format_diagnostic.sh +++ b/tests/suites/0_stateless/20+_others/20_0009_format_diagnostic.sh @@ -20,6 +20,6 @@ echo "drop table if exists a;" | $MYSQL_CLIENT_CONNECT echo "create table a ( a datetime, b string, c int);" | $MYSQL_CLIENT_CONNECT -curl -sH "insert_sql:insert into a format Csv" -H "skip_header:0" -u root: -F "upload=@/tmp/databend_test_csv1.txt" -F "upload=@/tmp/databend_test_csv2.txt" -XPUT "http://localhost:${QUERY_HTTP_HANDLER_PORT}/v1/streaming_load" | head -c 246 +curl -sH "insert_sql:insert into a format Csv" -H "skip_header:0" -u root: -F "upload=@/tmp/databend_test_csv1.txt" -F "upload=@/tmp/databend_test_csv2.txt" -XPUT "http://localhost:${QUERY_HTTP_HANDLER_PORT}/v1/streaming_load" | grep -c "Date type" echo "drop table a;" | $MYSQL_CLIENT_CONNECT From 79e69e163134e21954b23bba16b763ea47ca02b5 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Fri, 23 Sep 2022 19:34:43 +0800 Subject: [PATCH 22/24] test(format): delete tmp files. --- .../14_0007_http_clickhouse_input_format_diagnostic.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/suites/0_stateless/14_clickhouse_http_handler/14_0007_http_clickhouse_input_format_diagnostic.sh b/tests/suites/0_stateless/14_clickhouse_http_handler/14_0007_http_clickhouse_input_format_diagnostic.sh index 4c72bd9b0095f..cbaad0f3c1115 100755 --- a/tests/suites/0_stateless/14_clickhouse_http_handler/14_0007_http_clickhouse_input_format_diagnostic.sh +++ b/tests/suites/0_stateless/14_clickhouse_http_handler/14_0007_http_clickhouse_input_format_diagnostic.sh @@ -59,4 +59,4 @@ curl -s -u 'root:' -XPOST "http://localhost:${QUERY_CLICKHOUSE_HTTP_HANDLER_PORT # cleanup curl -s -u 'root:' -XPOST "http://localhost:${QUERY_CLICKHOUSE_HTTP_HANDLER_PORT}/?enable_planner_v2=1" -d "drop table a" -# rm /tmp/databend_test*.txt +rm /tmp/databend_test*.txt From fcdb9f42e599ead40ac648e8c48f18ea81cbef03 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Fri, 23 Sep 2022 19:35:51 +0800 Subject: [PATCH 23/24] rm enable_planner_v2. Co-authored-by: Xuanwo --- .../14_0007_http_clickhouse_input_format_diagnostic.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/suites/0_stateless/14_clickhouse_http_handler/14_0007_http_clickhouse_input_format_diagnostic.sh b/tests/suites/0_stateless/14_clickhouse_http_handler/14_0007_http_clickhouse_input_format_diagnostic.sh index cbaad0f3c1115..e5cdae37264df 100755 --- a/tests/suites/0_stateless/14_clickhouse_http_handler/14_0007_http_clickhouse_input_format_diagnostic.sh +++ b/tests/suites/0_stateless/14_clickhouse_http_handler/14_0007_http_clickhouse_input_format_diagnostic.sh @@ -3,7 +3,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) . "$CURDIR"/../../../shell_env.sh -curl -s -u 'root:' -XPOST "http://localhost:${QUERY_CLICKHOUSE_HTTP_HANDLER_PORT}/?enable_planner_v2=1" -d "drop table if exists a" +curl -s -u 'root:' -XPOST "http://localhost:${QUERY_CLICKHOUSE_HTTP_HANDLER_PORT}/" -d "drop table if exists a" curl -s -u 'root:' -XPOST "http://localhost:${QUERY_CLICKHOUSE_HTTP_HANDLER_PORT}/?enable_planner_v2=1" -d "create table a ( a datetime, b string, c int)" # 1 bad date From 402be582d5cad42aa8b25871c053f0eac1d59a45 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Fri, 23 Sep 2022 20:14:57 +0800 Subject: [PATCH 24/24] test(format): update result, csv trim field leading spaces. --- tests/suites/0_stateless/13_tpch/13_0002_q2.result | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/suites/0_stateless/13_tpch/13_0002_q2.result b/tests/suites/0_stateless/13_tpch/13_0002_q2.result index b02d01b36d415..3423a92dbe670 100644 --- a/tests/suites/0_stateless/13_tpch/13_0002_q2.result +++ b/tests/suites/0_stateless/13_tpch/13_0002_q2.result @@ -13,16 +13,16 @@ 8271.39 Supplier#000000146 RUSSIA 4637 Manufacturer#5 rBDNgCr04x0sfdzD5,gFOutCiG2 32-792-619-3155 s cajole quickly special requests. quickly enticing theodolites h 8096.98 Supplier#000000574 RUSSIA 323 Manufacturer#4 2O8 sy9g2mlBOuEjzj0pA2pevk, 32-866-246-8752 ully after the regular requests. slyly final dependencies wake slyly along the busy deposit 7392.78 Supplier#000000170 UNITED KINGDOM 7655 Manufacturer#2 RtsXQ,SunkA XHy9 33-803-340-5398 ake carefully across the quickly -7205.2 Supplier#000000477 GERMANY 10956 Manufacturer#5 VtaNKN5Mqui5yh7j2ldd5waf 17-180-144-7991 excuses wake express deposits. furiously careful asymptotes according to the carefull +7205.2 Supplier#000000477 GERMANY 10956 Manufacturer#5 VtaNKN5Mqui5yh7j2ldd5waf 17-180-144-7991 excuses wake express deposits. furiously careful asymptotes according to the carefull 6820.35 Supplier#000000007 UNITED KINGDOM 13217 Manufacturer#5 s,4TicNGB4uO6PaSqNBUq 33-990-965-2201 s unwind silently furiously regular courts. final requests are deposits. requests wake quietly blit 6721.7 Supplier#000000954 FRANCE 4191 Manufacturer#3 P3O5p UFz1QsLmZX 16-537-341-8517 ect blithely blithely final acco -6329.9 Supplier#000000996 GERMANY 10735 Manufacturer#2 Wx4dQwOAwWjfSCGupfrM 17-447-811-3282 ironic forges cajole blithely agai +6329.9 Supplier#000000996 GERMANY 10735 Manufacturer#2 Wx4dQwOAwWjfSCGupfrM 17-447-811-3282 ironic forges cajole blithely agai 6173.87 Supplier#000000408 RUSSIA 18139 Manufacturer#1 qcor1u,vJXAokjnL5,dilyYNmh 32-858-724-2950 blithely pending packages cajole furiously slyly pending notornis. slyly final -5364.99 Supplier#000000785 RUSSIA 13784 Manufacturer#4 W VkHBpQyD3qjQjWGpWicOpmILFehmEdWy67kUGY 32-297-653-2203 packages boost carefully. express ideas along +5364.99 Supplier#000000785 RUSSIA 13784 Manufacturer#4 W VkHBpQyD3qjQjWGpWicOpmILFehmEdWy67kUGY 32-297-653-2203 packages boost carefully. express ideas along 5069.27 Supplier#000000328 GERMANY 16327 Manufacturer#1 SMm24d WG62 17-231-513-5721 he unusual ideas. slyly final packages a 4941.88 Supplier#000000321 ROMANIA 7320 Manufacturer#5 pLngFl5yeMcHyov 29-573-279-1406 y final requests impress s 4672.25 Supplier#000000239 RUSSIA 12238 Manufacturer#1 XO101kgHrJagK2FL1U6QCaTE ncCsMbeuTgK6o8 32-396-654-6826 arls wake furiously deposits. even, regular depen -4586.49 Supplier#000000680 RUSSIA 5679 Manufacturer#3 UhvDfdEfJh,Qbe7VZb8uSGO2TU 0jEa6nXZXE 32-522-382-1620 the regularly regular dependencies. carefully bold excuses under th +4586.49 Supplier#000000680 RUSSIA 5679 Manufacturer#3 UhvDfdEfJh,Qbe7VZb8uSGO2TU 0jEa6nXZXE 32-522-382-1620 the regularly regular dependencies. carefully bold excuses under th 4518.31 Supplier#000000149 FRANCE 18344 Manufacturer#5 pVyWsjOidpHKp4NfKU4yLeym 16-660-553-2456 ts detect along the foxes. final Tiresias are. idly pending deposits haggle; even, blithe pin 4315.15 Supplier#000000509 FRANCE 18972 Manufacturer#2 SF7dR8V5pK 16-298-154-3365 ronic orbits are furiously across the requests. quickly express ideas across the special, bold 3526.53 Supplier#000000553 FRANCE 8036 Manufacturer#4 a,liVofXbCJ 16-599-552-3755 lar dinos nag slyly brave @@ -36,9 +36,9 @@ 765.69 Supplier#000000799 RUSSIA 11276 Manufacturer#2 jwFN7ZB3T9sMF 32-579-339-1495 nusual requests. furiously unusual epitaphs integrate. slyly 727.89 Supplier#000000470 ROMANIA 6213 Manufacturer#3 XckbzsAgBLbUkdfjgJEPjmUMTM8ebSMEvI 29-165-289-1523 gular excuses. furiously regular excuses sleep slyly caref 683.07 Supplier#000000651 RUSSIA 4888 Manufacturer#4 oWekiBV6s,1g 32-181-426-4490 ly regular requests cajole abou -167.56 Supplier#000000290 FRANCE 2037 Manufacturer#1 6Bk06GVtwZaKqg01 16-675-286-5102 the theodolites. ironic, ironic deposits above +167.56 Supplier#000000290 FRANCE 2037 Manufacturer#1 6Bk06GVtwZaKqg01 16-675-286-5102 the theodolites. ironic, ironic deposits above 91.39 Supplier#000000949 UNITED KINGDOM 9430 Manufacturer#2 a,UE,6nRVl2fCphkOoetR1ajIzAEJ1Aa1G1HV 33-332-697-2768 pinto beans. carefully express requests hagg --314.06 Supplier#000000510 ROMANIA 17242 Manufacturer#4 VmXQl ,vY8JiEseo8Mv4zscvNCfsY 29-207-852-3454 bold deposits. carefully even d +-314.06 Supplier#000000510 ROMANIA 17242 Manufacturer#4 VmXQl ,vY8JiEseo8Mv4zscvNCfsY 29-207-852-3454 bold deposits. carefully even d -820.89 Supplier#000000409 GERMANY 2156 Manufacturer#5 LyXUYFz7aXrvy65kKAbTatGzGS,NDBcdtD 17-719-517-9836 y final, slow theodolites. furiously regular req -845.44 Supplier#000000704 ROMANIA 9926 Manufacturer#5 hQvlBqbqqnA5Dgo1BffRBX78tkkRu 29-300-896-5991 ctions. carefully sly requ -942.73 Supplier#000000563 GERMANY 5797 Manufacturer#1 Rc7U1cRUhYs03JD 17-108-537-2691 slyly furiously final decoys; silent, special realms poach f