Skip to content

Commit

Permalink
feat: CSV add option empty_field_as. (databendlabs#14269)
Browse files Browse the repository at this point in the history
* feat: CSV add option empty_field_as.

* fix.

* fix reading header only csv file.

* fix date in META_CHANGE_LOG.
  • Loading branch information
youngsofun authored Jan 9, 2024
1 parent 3a107d6 commit 4b0bbe0
Show file tree
Hide file tree
Showing 22 changed files with 317 additions and 22 deletions.
9 changes: 9 additions & 0 deletions src/common/storage/src/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,15 @@ pub enum FileParseError {
column_name: String,
column_type: String,
},
#[error(
"Empty value for column {column_index} ({column_name} {column_type}), when option empty_field_as='{empty_field_as}'"
)]
ColumnEmptyError {
column_index: usize,
column_name: String,
column_type: String,
empty_field_as: String,
},
#[error(
"Invalid value '{column_data}' for column {column_index} ({column_name} {column_type}): {size_remained} bytes remained, next_char at {error_pos} is {next_char}"
)]
Expand Down
43 changes: 43 additions & 0 deletions src/meta/app/src/principal/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const OPT_ROW_TAG: &str = "row_tag";
const OPT_ERROR_ON_COLUMN_COUNT_MISMATCH: &str = "error_on_column_count_mismatch";
const MISSING_FIELD_AS: &str = "missing_field_as";
const NULL_FIELD_AS: &str = "null_field_as";
const OPT_EMPTY_FIELD_AS: &str = "empty_field_as";

#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct FileFormatOptionsAst {
Expand Down Expand Up @@ -200,6 +201,12 @@ impl FileFormatParams {
let escape = ast.take_string(OPT_ESCAPE, default.escape);
let quote = ast.take_string(OPT_QUOTE, default.quote);
let null_display = ast.take_string(OPT_NULL_DISPLAY, default.null_display);
let empty_field_as = ast
.options
.remove(OPT_EMPTY_FIELD_AS)
.map(|s| EmptyFieldAs::from_str(&s))
.transpose()?
.unwrap_or_default();
let error_on_column_count_mismatch = ast.take_bool(
OPT_ERROR_ON_COLUMN_COUNT_MISMATCH,
default.error_on_column_count_mismatch,
Expand All @@ -214,6 +221,7 @@ impl FileFormatParams {
escape,
quote,
error_on_column_count_mismatch,
empty_field_as,
})
}
StageFileFormatType::Tsv => {
Expand Down Expand Up @@ -309,6 +317,7 @@ pub struct CsvFileFormatParams {
pub escape: String,
pub quote: String,
pub error_on_column_count_mismatch: bool,
pub empty_field_as: EmptyFieldAs,
}

impl Default for CsvFileFormatParams {
Expand All @@ -323,6 +332,7 @@ impl Default for CsvFileFormatParams {
escape: "".to_string(),
quote: "\"".to_string(),
error_on_column_count_mismatch: true,
empty_field_as: Default::default(),
}
}
}
Expand Down Expand Up @@ -409,6 +419,39 @@ pub enum NullAs {
TypeDefault,
}

#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
pub enum EmptyFieldAs {
#[default]
FieldDefault,
Null,
String,
}

impl FromStr for EmptyFieldAs {
type Err = ErrorCode;

fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"string" => Ok(Self::String),
"null" => Ok(Self::Null),
"field_default" => Ok(Self::FieldDefault),
_ => Err(ErrorCode::InvalidArgument(format!(
"invalid value ({s}) for empty_field_as, available values field_default|null|string."
))),
}
}
}

impl Display for EmptyFieldAs {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::FieldDefault => write!(f, "field_default"),
Self::Null => write!(f, "null"),
Self::String => write!(f, "string"),
}
}
}

impl NullAs {
fn parse(s: Option<&str>, option_name: &str, default: Self) -> Result<Self> {
match s {
Expand Down
15 changes: 15 additions & 0 deletions src/meta/proto-conv/src/file_format_from_to_protobuf_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
//! This mod is the key point about compatibility.
//! Everytime update anything in this file, update the `VER` and let the tests pass.
use std::str::FromStr;

use databend_common_meta_app as mt;
use databend_common_meta_app::principal::EmptyFieldAs;
use databend_common_meta_app::principal::NullAs;
use databend_common_protos::pb;
use num::FromPrimitive;
Expand Down Expand Up @@ -422,6 +425,16 @@ impl FromToProto for mt::principal::CsvFileFormatParams {
} else {
p.null_display
};

let empty_field_as = p
.empty_field_as
.map(|s| {
EmptyFieldAs::from_str(&s).map_err(|e| Incompatible {
reason: format!("{:?}", e),
})
})
.transpose()?
.unwrap_or_default();
Ok(Self {
compression,
headers: p.headers,
Expand All @@ -432,6 +445,7 @@ impl FromToProto for mt::principal::CsvFileFormatParams {
nan_display: p.nan_display,
null_display,
error_on_column_count_mismatch: !p.allow_column_count_mismatch,
empty_field_as,
})
}

Expand All @@ -449,6 +463,7 @@ impl FromToProto for mt::principal::CsvFileFormatParams {
nan_display: self.nan_display.clone(),
null_display: self.null_display.clone(),
allow_column_count_mismatch: !self.error_on_column_count_mismatch,
empty_field_as: Some(self.empty_field_as.to_string()),
})
}
}
Expand Down
1 change: 1 addition & 0 deletions src/meta/proto-conv/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ const META_CHANGE_LOG: &[(u64, &str)] = &[
(69, "2023-12-21: Add: user.proto/GrantTableIdObject and GrantDatabaseIdObject", ),
(70, "2023-12-25: Add: datatype.proto Binary type", ),
(71, "2024-01-02: Add: user.proto/password options", ),
(72, "2024-01-09: Add: user.proto/CSVFileFormatParams add field `empty_field_as`", ),
// Dear developer:
// If you're gonna add a new metadata version, you'll have to add a test for it.
// You could just copy an existing test file(e.g., `../tests/it/v024_table_meta.rs`)
Expand Down
1 change: 1 addition & 0 deletions src/meta/proto-conv/tests/it/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,4 @@ mod v068_index_meta;
mod v069_user_grant_id;
mod v070_binary_type;
mod v071_user_password;
mod v072_csv_format_params;
1 change: 1 addition & 0 deletions src/meta/proto-conv/tests/it/v032_file_format_params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ fn test_decode_v32_csv_file_format_params() -> anyhow::Result<()> {
escape: "\\".to_string(),
quote: "\'".to_string(),
error_on_column_count_mismatch: true,
empty_field_as: Default::default(),
})
};
common::test_load_old(func_name!(), file_format_params_v32.as_slice(), 0, want())?;
Expand Down
1 change: 1 addition & 0 deletions src/meta/proto-conv/tests/it/v053_csv_format_params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ fn test_decode_v53_csv_file_format_params() -> anyhow::Result<()> {
escape: "\\".to_string(),
quote: "\'".to_string(),
error_on_column_count_mismatch: true,
empty_field_as: Default::default(),
})
};
common::test_load_old(func_name!(), file_format_params_v32.as_slice(), 0, want())?;
Expand Down
1 change: 1 addition & 0 deletions src/meta/proto-conv/tests/it/v059_csv_format_params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ fn test_decode_v59_csv_file_format_params() -> anyhow::Result<()> {
escape: "\\".to_string(),
quote: "\'".to_string(),
error_on_column_count_mismatch: false,
empty_field_as: Default::default(),
})
};
common::test_load_old(func_name!(), file_format_params_v59.as_slice(), 0, want())?;
Expand Down
56 changes: 56 additions & 0 deletions src/meta/proto-conv/tests/it/v072_csv_format_params.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2023 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use databend_common_meta_app as mt;
use databend_common_meta_app::principal::CsvFileFormatParams;
use databend_common_meta_app::principal::EmptyFieldAs;
use databend_common_meta_app::principal::StageFileCompression;
use minitrace::func_name;

use crate::common;

// These bytes are built when a new version in introduced,
// and are kept for backward compatibility test.
//
// *************************************************************
// * These messages should never be updated, *
// * only be added when a new version is added, *
// * or be removed when an old version is no longer supported. *
// *************************************************************
//
#[test]
fn test_decode_v72_csv_file_format_params() -> anyhow::Result<()> {
let file_format_params_v72 = vec![
18, 48, 8, 1, 16, 1, 26, 2, 102, 100, 34, 2, 114, 100, 42, 6, 109, 121, 95, 110, 97, 110,
50, 1, 124, 58, 1, 39, 66, 4, 78, 117, 108, 108, 72, 1, 82, 6, 115, 116, 114, 105, 110,
103, 160, 6, 72, 168, 6, 24,
];
let want = || {
mt::principal::FileFormatParams::Csv(CsvFileFormatParams {
compression: StageFileCompression::Gzip,
headers: 1,
field_delimiter: "fd".to_string(),
record_delimiter: "rd".to_string(),
null_display: "Null".to_string(),
nan_display: "my_nan".to_string(),
escape: "|".to_string(),
quote: "\'".to_string(),
error_on_column_count_mismatch: false,
empty_field_as: EmptyFieldAs::String,
})
};
common::test_load_old(func_name!(), file_format_params_v72.as_slice(), 0, want())?;
common::test_pb_from_to(func_name!(), want())?;
Ok(())
}
1 change: 1 addition & 0 deletions src/meta/protos/proto/file_format.proto
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ message CsvFileFormatParams {
// corresponding to `!error_on_column_count_mismatch`
// for we can not set default value in proto3
bool allow_column_count_mismatch = 9;
optional string empty_field_as = 10;
}

message TsvFileFormatParams {
Expand Down
1 change: 1 addition & 0 deletions src/query/ast/src/parser/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ pub fn format_options(i: Input) -> IResult<BTreeMap<String, String>> {
| QUOTE
| NAN_DISPLAY
| NULL_DISPLAY
| EMPTY_FIELD_AS
| ESCAPE
| NULL_FIELD_AS
| MISSING_FIELD_AS
Expand Down
2 changes: 2 additions & 0 deletions src/query/ast/src/parser/token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,8 @@ pub enum TokenKind {
EXCLUDE,
#[token("ELSE", ignore(ascii_case))]
ELSE,
#[token("EMPTY_FIELD_AS", ignore(ascii_case))]
EMPTY_FIELD_AS,
#[token("ENABLE_VIRTUAL_HOST_STYLE", ignore(ascii_case))]
ENABLE_VIRTUAL_HOST_STYLE,
#[token("END", ignore(ascii_case))]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::ColumnBuilder;
use databend_common_expression::Scalar;
use databend_common_expression::TableDataType;
use databend_common_expression::TableSchemaRef;
use databend_common_formats::FieldDecoder;
use databend_common_formats::FileFormatOptionsExt;
use databend_common_formats::RecordDelimiter;
use databend_common_formats::SeparatedTextDecoder;
use databend_common_meta_app::principal::CsvFileFormatParams;
use databend_common_meta_app::principal::EmptyFieldAs;
use databend_common_meta_app::principal::FileFormatParams;
use databend_common_meta_app::principal::StageFileFormatType;
use databend_common_storage::FileParseError;
Expand Down Expand Up @@ -57,14 +59,46 @@ impl InputFormatCSV {
column_index: usize,
schema: &TableSchemaRef,
default_values: &Option<Vec<Scalar>>,
empty_filed_as: &EmptyFieldAs,
) -> std::result::Result<(), FileParseError> {
if col_data.is_empty() {
match default_values {
None => {
// query
builder.push_default();
}
Some(values) => {
builder.push(values[column_index].as_ref());
let field = &schema.fields()[column_index];
// copy
match empty_filed_as {
EmptyFieldAs::FieldDefault => {
builder.push(values[column_index].as_ref());
}
EmptyFieldAs::Null => {
if !matches!(field.data_type, TableDataType::Nullable(_)) {
return Err(FileParseError::ColumnEmptyError {
column_index,
column_name: field.name().to_owned(),
column_type: field.data_type.to_string(),
empty_field_as: empty_filed_as.to_string(),
});
}
builder.push_default();
}
EmptyFieldAs::String => {
if !matches!(field.data_type.remove_nullable(), TableDataType::String) {
let field = &schema.fields()[column_index];
return Err(FileParseError::ColumnEmptyError {
column_index,
column_name: field.name().to_owned(),
column_type: field.data_type.to_string(),
empty_field_as: empty_filed_as.to_string(),
});
}

builder.push_default();
}
}
}
}
return Ok(());
Expand All @@ -82,6 +116,7 @@ impl InputFormatCSV {
field_ends: &[usize],
columns_to_read: &Option<Vec<usize>>,
default_values: &Option<Vec<Scalar>>,
empty_filed_as: &EmptyFieldAs,
) -> std::result::Result<(), FileParseError> {
if let Some(columns_to_read) = columns_to_read {
for c in columns_to_read {
Expand All @@ -98,6 +133,7 @@ impl InputFormatCSV {
*c,
schema,
default_values,
empty_filed_as,
)?;
}
}
Expand All @@ -106,7 +142,15 @@ impl InputFormatCSV {
for (c, column) in columns.iter_mut().enumerate() {
let field_end = field_ends[c];
let col_data = &buf[field_start..field_end];
Self::read_column(column, field_decoder, col_data, c, schema, default_values)?;
Self::read_column(
column,
field_decoder,
col_data,
c,
schema,
default_values,
empty_filed_as,
)?;
field_start = field_end;
}
}
Expand Down Expand Up @@ -182,6 +226,10 @@ impl InputFormatTextBase for InputFormatCSV {
.as_any()
.downcast_ref::<SeparatedTextDecoder>()
.expect("must success");
let format_params = match builder.ctx.file_format_params {
FileFormatParams::Csv(ref p) => p,
_ => unreachable!(),
};
for (i, end) in batch.row_ends.iter().enumerate() {
let num_fields = batch.num_fields[i];
let buf = &batch.data[start..*end];
Expand All @@ -193,6 +241,7 @@ impl InputFormatTextBase for InputFormatCSV {
&batch.field_ends[field_end_idx..field_end_idx + num_fields],
&builder.projection,
&builder.ctx.default_values,
&format_params.empty_field_as,
) {
builder.ctx.on_error(
e,
Expand Down Expand Up @@ -334,9 +383,11 @@ impl AligningStateTextBased for CsvReaderState {
let mut file_status = FileStatus::default();
let mut buf_out = vec![0u8; buf_in.len()];
while self.common.rows_to_skip > 0 {
let (_, n_in) = self.read_record(buf_in, &mut buf_out, &mut file_status)?;
let (res, n_in) = self.read_record(buf_in, &mut buf_out, &mut file_status)?;
buf_in = &buf_in[n_in..];
self.common.rows_to_skip -= 1;
if matches!(res, ReadRecordOutput::Record { .. }) {
self.common.rows_to_skip -= 1;
}
}

let mut buf_out_pos = 0usize;
Expand Down
Loading

0 comments on commit 4b0bbe0

Please sign in to comment.