Skip to content

Commit

Permalink
Merge pull request #7843 from youngsofun/fmt
Browse files Browse the repository at this point in the history
refactor(clickhouse_handler): use new InputFormat.
  • Loading branch information
mergify[bot] authored Sep 23, 2022
2 parents b37a123 + 3e02b29 commit b0539ac
Show file tree
Hide file tree
Showing 24 changed files with 313 additions and 222 deletions.
11 changes: 5 additions & 6 deletions src/meta/types/src/user_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,17 +135,16 @@ impl FromStr for StageFileFormatType {
fn from_str(s: &str) -> std::result::Result<Self, String> {
match s.to_uppercase().as_str() {
"CSV" => Ok(StageFileFormatType::Csv),
"TSV" => Ok(StageFileFormatType::Tsv),
"TSV" | "TABSEPARATED" => Ok(StageFileFormatType::Tsv),
"JSON" => Ok(StageFileFormatType::Json),
"NDJSON" => Ok(StageFileFormatType::NdJson),
"NDJSON" | "JSONEACHROW" => Ok(StageFileFormatType::NdJson),
"AVRO" => Ok(StageFileFormatType::Avro),
"ORC" => Ok(StageFileFormatType::Orc),
"PARQUET" => Ok(StageFileFormatType::Parquet),
"XML" => Ok(StageFileFormatType::Xml),
_ => Err(
"Unknown file format type, must one of { CSV | JSON | AVRO | ORC | PARQUET | XML }"
.to_string(),
),
_ => Err(format!(
"Unknown file format type '{s}', must one of ( CSV | TSV | JSON | AVRO | ORC | PARQUET | XML)"
)),
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/query/ast/src/ast/format/ast_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -955,13 +955,13 @@ impl<'ast> Visitor<'ast> for AstFormatVisitor {
fn visit_insert_source(&mut self, insert_source: &'ast InsertSource<'ast>) {
match insert_source {
InsertSource::Streaming { format, .. } => {
let streaming_name = format!("StreamSouce {}", format);
let streaming_name = format!("StreamSource {}", format);
let streaming_format_ctx = AstFormatContext::new(streaming_name);
let streaming_node = FormatTreeNode::new(streaming_format_ctx);
self.children.push(streaming_node);
}
InsertSource::Values { .. } => {
let values_name = "ValueSouce".to_string();
let values_name = "ValueSource".to_string();
let values_format_ctx = AstFormatContext::new(values_name);
let values_node = FormatTreeNode::new(values_format_ctx);
self.children.push(values_node);
Expand Down
9 changes: 7 additions & 2 deletions src/query/ast/src/ast/format/syntax/dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,18 @@ pub(crate) fn pretty_insert(insert_stmt: InsertStmt) -> RcDoc {

fn pretty_source(source: InsertSource) -> RcDoc {
RcDoc::line().append(match source {
InsertSource::Streaming { format, rest_str } => RcDoc::text("FORMAT")
InsertSource::Streaming {
format,
rest_str,
start,
} => RcDoc::text("FORMAT")
.append(RcDoc::space())
.append(RcDoc::text(format))
.append(
RcDoc::line()
.nest(NEST_FACTOR)
.append(RcDoc::text(rest_str.to_string())),
.append(RcDoc::text(rest_str.to_string()))
.append(RcDoc::text(start.to_string())),
),
InsertSource::Values { rest_str } => RcDoc::text("VALUES").append(
RcDoc::line()
Expand Down
20 changes: 16 additions & 4 deletions src/query/ast/src/ast/statements/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,27 @@ impl Display for InsertStmt<'_> {

#[derive(Debug, Clone, PartialEq)]
pub enum InsertSource<'a> {
Streaming { format: String, rest_str: &'a str },
Values { rest_str: &'a str },
Select { query: Box<Query<'a>> },
Streaming {
format: String,
rest_str: &'a str,
start: usize,
},
Values {
rest_str: &'a str,
},
Select {
query: Box<Query<'a>>,
},
}

impl Display for InsertSource<'_> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
InsertSource::Streaming { format, rest_str } => {
InsertSource::Streaming {
format,
rest_str,
start: _,
} => {
write!(f, "FORMAT {format} {rest_str}")
}
InsertSource::Values { rest_str } => write!(f, "VALUES {rest_str}"),
Expand Down
12 changes: 8 additions & 4 deletions src/query/ast/src/parser/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -995,16 +995,17 @@ pub fn insert_source(i: Input) -> IResult<InsertSource> {
rule! {
FORMAT ~ #ident ~ #rest_str
},
|(_, format, rest_str)| InsertSource::Streaming {
|(_, format, (rest_str, start))| InsertSource::Streaming {
format: format.name,
rest_str,
start,
},
);
let values = map(
rule! {
VALUES ~ #rest_str
},
|(_, rest_str)| InsertSource::Values { rest_str },
|(_, (rest_str, _))| InsertSource::Values { rest_str },
);
let query = map(query, |query| InsertSource::Select {
query: Box::new(query),
Expand All @@ -1018,13 +1019,16 @@ pub fn insert_source(i: Input) -> IResult<InsertSource> {
}

#[allow(clippy::needless_lifetimes)]
pub fn rest_str<'a>(i: Input<'a>) -> IResult<&'a str> {
pub fn rest_str<'a>(i: Input<'a>) -> IResult<(&'a str, usize)> {
// It's safe to unwrap because input must contain EOI.
let first_token = i.0.first().unwrap();
let last_token = i.0.last().unwrap();
Ok((
i.slice((i.len() - 1)..),
&first_token.source[first_token.span.start..last_token.span.end],
(
&first_token.source[first_token.span.start..last_token.span.end],
first_token.span.start,
),
))
}

Expand Down
1 change: 1 addition & 0 deletions src/query/ast/tests/it/testdata/statement.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4360,6 +4360,7 @@ Insert(
source: Streaming {
format: "json",
rest_str: ";",
start: 31,
},
overwrite: false,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ use std::sync::Arc;
use common_datavalues::TypeDeserializer;
use common_exception::ErrorCode;
use common_exception::Result;
use common_formats::verbose_string;
use common_io::prelude::BufferReadExt;
use common_io::prelude::FormatSettings;
use common_io::prelude::NestedCheckpointReader;
use common_meta_types::StageFileFormatType;
use common_settings::Settings;
use csv_core::ReadRecordResult;

use crate::processors::sources::input_formats::delimiter::RecordDelimiter;
use crate::processors::sources::input_formats::impls::input_format_tsv::format_column_error;
use crate::processors::sources::input_formats::input_format_text::get_time_zone;
use crate::processors::sources::input_formats::input_format_text::AligningState;
use crate::processors::sources::input_formats::input_format_text::BlockBuilder;
Expand All @@ -48,23 +49,21 @@ impl InputFormatCSV {
for (c, deserializer) in deserializers.iter_mut().enumerate() {
let field_end = field_ends[c];
let col_data = &buf[field_start..field_end];
if col_data.is_empty() {
let mut reader = NestedCheckpointReader::new(col_data);
reader.ignore_white_spaces().expect("must success");
if reader.eof().expect("must success") {
deserializer.de_default(format_settings);
} else {
let mut reader = NestedCheckpointReader::new(col_data);
// reader.ignores(|c: u8| c == b' ').expect("must success");
// todo(youngsofun): do not need escape, already done in csv-core
if let Err(e) = deserializer.de_text(&mut reader, format_settings) {
let mut value = String::new();
verbose_string(buf, &mut value);
let err_msg = format!(
"fail to decode column {}: {:?}, [column_data]=[{}]",
c,
e.message(),
value
);
let err_msg = format_column_error(c, col_data, &e.message());
return Err(csv_error(&err_msg, path, row_index));
};
reader.ignore_white_spaces().expect("must success");
if reader.must_eof().is_err() {
let err_msg = format_column_error(c, col_data, "bad field end");
return Err(csv_error(&err_msg, path, row_index));
}
}
field_start = field_end;
}
Expand Down Expand Up @@ -178,6 +177,14 @@ impl InputFormatTextBase for InputFormatCSV {
&state.path,
state.rows,
));
} else if endlen == num_fields + 1
&& field_ends[num_fields] != field_ends[num_fields - 1]
{
return Err(csv_error(
"CSV allow ending with ',', but should not have data after it",
&state.path,
state.rows,
));
}

state.rows_to_skip -= 1;
Expand Down Expand Up @@ -250,6 +257,14 @@ impl InputFormatTextBase for InputFormatCSV {
&state.path,
start_row + row_batch.row_ends.len(),
));
} else if endlen == num_fields + 1
&& field_ends[num_fields] != field_ends[num_fields - 1]
{
return Err(csv_error(
"CSV allow ending with ',', but should not have data after it",
&state.path,
start_row + row_batch.row_ends.len(),
));
}
row_batch
.field_ends
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,15 @@ impl InputFormatTSV {
if let Err(e) =
deserializers[column_index].de_text(&mut reader, format_settings)
{
err_msg = Some(format!(
"fail to decode column {}: {:?}, [column_data]=[{}]",
column_index, e, ""
));
err_msg = Some(format_column_error(column_index, col_data, &e.message()));
break;
};
// todo(youngsofun): check remaining data
reader.ignore_white_spaces().expect("must success");
if reader.must_eof().is_err() {
err_msg =
Some(format_column_error(column_index, col_data, "bad field end"));
break;
}
}
column_index += 1;
field_start = pos + 1;
Expand All @@ -76,14 +78,14 @@ impl InputFormatTSV {
}
pos += 1;
}
if column_index < num_columns - 1 {
if err_msg.is_none() && column_index < num_columns {
// todo(youngsofun): allow it optionally (set default)
err_msg = Some(format!(
"need {} columns, find {} only",
num_columns,
column_index + 1
num_columns, column_index
));
}

if let Some(m) = err_msg {
let row_info = if let Some(r) = row_index {
format!("at row {},", r)
Expand Down Expand Up @@ -158,3 +160,12 @@ impl InputFormatTextBase for InputFormatTSV {
Ok(state.align_by_record_delimiter(buf))
}
}

pub fn format_column_error(column_index: usize, col_data: &[u8], msg: &str) -> String {
let mut data = String::new();
verbose_string(col_data, &mut data);
format!(
"fail to decode column {}: {}, [column_data]=[{}]",
column_index, msg, data
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ const MIN_ROW_PER_BLOCK: usize = 800 * 1000;
pub enum InputPlan {
CopyInto(Box<CopyIntoPlan>),
StreamingLoad(StreamPlan),
Clickhouse,
}

impl InputPlan {
Expand All @@ -69,6 +68,7 @@ pub struct CopyIntoPlan {

#[derive(Debug)]
pub struct StreamPlan {
pub is_multi_part: bool,
pub compression: StageFileCompression,
}

Expand Down Expand Up @@ -215,7 +215,11 @@ impl InputContext {
settings: Arc<Settings>,
schema: DataSchemaRef,
scan_progress: Arc<Progress>,
is_multi_part: bool,
) -> Result<Self> {
let (format_name, rows_to_skip) = remove_clickhouse_format_suffix(format_name);
let rows_to_skip = std::cmp::max(settings.get_skip_header()? as usize, rows_to_skip);

let format_type =
StageFileFormatType::from_str(format_name).map_err(ErrorCode::UnknownFormat)?;
let format = Self::get_input_format(&format_type)?;
Expand All @@ -231,14 +235,16 @@ impl InputContext {
}
};
let record_delimiter = RecordDelimiter::try_from(&settings.get_record_delimiter()?[..])?;
let rows_to_skip = settings.get_skip_header()? as usize;
let compression = settings.get_compression()?;
let compression = if !compression.is_empty() {
StageFileCompression::from_str(&compression).map_err(ErrorCode::BadArguments)?
} else {
StageFileCompression::Auto
};
let plan = StreamPlan { compression };
let plan = StreamPlan {
is_multi_part,
compression,
};

Ok(InputContext {
format,
Expand Down Expand Up @@ -294,7 +300,6 @@ impl InputContext {
let opt = match &self.plan {
InputPlan::CopyInto(p) => p.stage_info.file_format_options.compression,
InputPlan::StreamingLoad(p) => p.compression,
_ => StageFileCompression::None,
};
Self::get_compression_alg_copy(opt, path)
}
Expand Down Expand Up @@ -325,3 +330,18 @@ impl InputContext {
Ok(compression_algo)
}
}

const WITH_NAMES_AND_TYPES: &str = "withnamesandtypes";
const WITH_NAMES: &str = "withnames";

fn remove_clickhouse_format_suffix(name: &str) -> (&str, usize) {
let s = name.to_lowercase();
let (suf_len, skip) = if s.ends_with(WITH_NAMES_AND_TYPES) {
(WITH_NAMES_AND_TYPES.len(), 2)
} else if s.ends_with(WITH_NAMES) {
(WITH_NAMES.len(), 1)
} else {
(0, 0)
};
(&name[0..(s.len() - suf_len)], skip)
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use opendal::io_util::CompressAlgorithm;

use crate::processors::sources::input_formats::input_context::InputContext;
use crate::processors::sources::input_formats::input_context::InputPlan;
use crate::processors::sources::input_formats::input_context::StreamPlan;
use crate::processors::sources::input_formats::input_format::SplitInfo;
use crate::processors::sources::input_formats::source_aligner::Aligner;
use crate::processors::sources::input_formats::source_deserializer::DeserializeSource;
Expand Down Expand Up @@ -210,8 +211,13 @@ pub trait InputFormatPipe: Sized + Send + 'static {
let n_threads = ctx.settings.get_max_threads()? as usize;
let max_aligner = match ctx.plan {
InputPlan::CopyInto(_) => ctx.splits.len(),
InputPlan::StreamingLoad(_) => 3,
InputPlan::Clickhouse => 1,
InputPlan::StreamingLoad(StreamPlan { is_multi_part, .. }) => {
if is_multi_part {
3
} else {
1
}
}
};
let (row_batch_tx, row_batch_rx) = crossbeam_channel::bounded(n_threads);
for _ in 0..std::cmp::min(max_aligner, n_threads) {
Expand Down
Loading

1 comment on commit b0539ac

@vercel
Copy link

@vercel vercel bot commented on b0539ac Sep 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

databend – ./

databend-git-main-databend.vercel.app
databend.vercel.app
databend-databend.vercel.app
databend.rs

Please sign in to comment.