Skip to content

Commit

Permalink
feat(query): new input format framework.
Browse files Browse the repository at this point in the history
  • Loading branch information
youngsofun committed Sep 16, 2022
1 parent 75273b0 commit 38881e4
Show file tree
Hide file tree
Showing 15 changed files with 1,496 additions and 0 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions src/query/pipeline/sources/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,22 @@ doctest = false
test = false

[dependencies]
async-channel = "1.7.1"
common-arrow = { path = "../../../common/arrow" }
common-base = { path = "../../../common/base" }
common-catalog = { path = "../../catalog" }
common-datablocks = { path = "../../datablocks" }
common-datavalues = { path = "../../datavalues" }
common-exception = { path = "../../../common/exception" }
common-formats = { path = "../../formats" }
common-io = { path = "../../../common/io" }
common-meta-types = { path = "../../../meta/types" }
common-pipeline-core = { path = "../core" }
common-settings = { path = "../../settings" }
common-storage = { path = "../../../common/storage" }
common-streams = { path = "../../streams" }

tracing = "0.1.35"
async-trait = { version = "0.1.0", package = "async-trait-fn" }
futures = "0.3.21"
futures-util = "0.3.21"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use common_exception::ErrorCode;
use common_exception::Result;

pub enum RecordDelimiter {
CRLF,
Any(u8),
}

impl RecordDelimiter {
pub fn end(&self) -> u8 {
match self {
RecordDelimiter::CRLF => b'\n',
RecordDelimiter::Any(b) => *b,
}
}
}

impl TryFrom<&str> for RecordDelimiter {
type Error = ErrorCode;
fn try_from(s: &str) -> Result<Self> {
match s.len() {
1 => Ok(RecordDelimiter::Any(s.as_bytes()[0])),
2 if s.eq("\r\n") => Ok(RecordDelimiter::CRLF),
_ => Err(ErrorCode::InvalidArgument(format!(
"bad RecordDelimiter: '{}'",
s
))),
}
}
}

impl Default for RecordDelimiter {
fn default() -> Self {
RecordDelimiter::CRLF
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::Debug;
use std::fmt::Formatter;
use std::sync::Arc;

use common_arrow::parquet::metadata::RowGroupMetaData;
use common_base::base::tokio::sync::mpsc::Receiver;
use common_datablocks::DataBlock;
use common_exception::Result;
use common_pipeline_core::Pipeline;
use opendal::Object;

use crate::processors::sources::input_formats::input_context::InputContext;
use crate::processors::sources::input_formats::input_format::FileInfo;
use crate::processors::sources::input_formats::input_format::InputData;
use crate::processors::sources::input_formats::input_format::SplitInfo;
use crate::processors::sources::input_formats::input_pipeline::AligningStateTrait;
use crate::processors::sources::input_formats::input_pipeline::BlockBuilderTrait;
use crate::processors::sources::input_formats::input_pipeline::InputFormatPipe;
use crate::processors::sources::input_formats::input_pipeline::StreamingReadBatch;
use crate::processors::sources::input_formats::InputFormat;

struct InputFormatParquet;

#[async_trait::async_trait]
impl InputFormat for InputFormatParquet {
async fn read_file_meta(
&self,
obj: &Object,
size: usize,
) -> Result<Option<Arc<dyn InputData>>> {
todo!()
}

async fn read_split_meta(
&self,
obj: &Object,
split_info: &SplitInfo,
) -> Result<Option<Box<dyn InputData>>> {
todo!()
}

fn split_files(&self, file_infos: Vec<FileInfo>, split_size: usize) -> Vec<SplitInfo> {
todo!()
}

fn exec_copy(&self, ctx: Arc<InputContext>, pipeline: &mut Pipeline) -> Result<()> {
todo!()
}

fn exec_stream(
&self,
ctx: Arc<InputContext>,
pipeline: &mut Pipeline,
input: Receiver<StreamingReadBatch>,
) -> Result<()> {
todo!()
}
}

pub struct ParquetFormatPipe;

#[async_trait::async_trait]
impl InputFormatPipe for ParquetFormatPipe {
type ReadBatch = ReadBatch;
type RowBatch = RowGroupInMemory;
type AligningState = AligningState;
type BlockBuilder = ParquetBlockBuilder;
}

pub struct SplitMeta {
row_groups: Vec<RowGroupMetaData>,
}

pub struct RowGroupInMemory {}

impl Debug for RowGroupInMemory {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "RowGroupInMemory")
}
}

#[derive(Debug)]
pub enum ReadBatch {
Buffer(Vec<u8>),
RowGroup(RowGroupInMemory),
}

impl From<Vec<u8>> for ReadBatch {
fn from(v: Vec<u8>) -> Self {
Self::Buffer(v)
}
}

pub struct ParquetBlockBuilder {
ctx: Arc<InputContext>,
}

impl BlockBuilderTrait for ParquetBlockBuilder {
type Pipe = ParquetFormatPipe;

fn create(ctx: Arc<InputContext>) -> Self {
ParquetBlockBuilder { ctx }
}

fn deserialize(&mut self, batch: Option<RowGroupInMemory>) -> Result<Vec<DataBlock>> {
todo!()
}
}

pub struct AligningState {
buffers: Vec<Vec<u8>>,
}

impl AligningStateTrait for AligningState {
type Pipe = ParquetFormatPipe;

fn try_create(ctx: &Arc<InputContext>, split_info: &SplitInfo) -> Result<Self> {
todo!()
}

fn align(&mut self, read_batch: Option<ReadBatch>) -> Result<Vec<RowGroupInMemory>> {
todo!()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use common_datavalues::TypeDeserializer;
use common_exception::ErrorCode;
use common_exception::Result;
use common_io::prelude::BufferReadExt;
use common_io::prelude::FormatSettings;
use common_io::prelude::NestedCheckpointReader;

use crate::processors::sources::input_formats::input_format_text::AligningState;
use crate::processors::sources::input_formats::input_format_text::BlockBuilder;
use crate::processors::sources::input_formats::input_format_text::InputFormatTextBase;
use crate::processors::sources::input_formats::input_format_text::RowBatch;

pub struct InputFormatTSV {}

impl InputFormatTSV {
fn read_row(
buf: &[u8],
deserializers: &mut Vec<common_datavalues::TypeDeserializerImpl>,
format_settings: &FormatSettings,
path: &str,
offset: usize,
row_index: Option<usize>,
) -> Result<()> {
let len = deserializers.len();
let mut n_col = 0;
let mut field_start = 0;
let mut pos = 0;
let mut err_msg = None;
for b in buf.iter() {
if *b == b'\t' {
let col_data = &buf[field_start..pos];
if col_data.is_empty() {
deserializers[n_col].de_default(format_settings);
} else {
let mut reader = NestedCheckpointReader::new(col_data);
reader.ignores(|c: u8| c == b' ').expect("must success");
if let Err(e) = deserializers[n_col].de_text(&mut reader, format_settings) {
err_msg = Some(format!(
"fail to decode column {}: {:?}, [column_data]=[{}]",
n_col, e, ""
));
break;
};
// todo(youngsofun): check remaining data
}
n_col += 1;
field_start = pos + 1;
if n_col > len {
err_msg = Some("too many columns".to_string());
break;
}
}
pos += 1;
}
if n_col < len {
// todo(youngsofun): allow it optionally (set default)
err_msg = Some(format!("need {} columns, find {} only", len, n_col).to_string());
}
if let Some(m) = err_msg {
let row_info = if let Some(r) = row_index {
format!("at row {},", r)
} else {
String::new()
};
let msg = format!(
"fail to parse tsv {} at offset {}, {}, reason={}",
path,
offset + pos,
row_info,
m
);
Err(ErrorCode::BadBytes(msg))
} else {
Ok(())
}
}
}

impl InputFormatTextBase for InputFormatTSV {
fn deserialize(builder: &mut BlockBuilder<Self>, batch: RowBatch) -> Result<()> {
let columns = &mut builder.mutable_columns;
let mut start = 0usize;
let start_row = batch.start_row;
for (i, end) in batch.row_ends.iter().enumerate() {
let buf = &batch.data[start..*end];
Self::read_row(
buf,
columns,
&builder.ctx.format_settings,
&batch.path,
batch.offset + start,
start_row.map(|n| n + i),
)?;
start = *end;
}
Ok(())
}

fn align(state: &mut AligningState<Self>, buf: &[u8]) -> Result<Vec<RowBatch>> {
Ok(state.align_by_row_delimiter(buf))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod input_format_parquet;
pub mod input_format_tsv;
Loading

0 comments on commit 38881e4

Please sign in to comment.