From 3ffea390bbd0261211e6ca94577f7c94d81ca23f Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Thu, 10 Nov 2022 15:49:39 +0800 Subject: [PATCH 1/5] refactor(http handler): use FieldEncoder to encode data. --- Cargo.lock | 1 + .../src/field_encoder/field_encoder_values.rs | 15 +++++++++++++++ src/query/service/Cargo.toml | 1 + .../service/src/servers/http/v1/json_block.rs | 13 +++++++++++-- 4 files changed, 28 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f67f53408a88d..365a52bdcf36b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3051,6 +3051,7 @@ dependencies = [ "async-trait-fn", "backon", "base64", + "bstr 1.0.1", "bumpalo", "byteorder", "bytes", diff --git a/src/query/formats/src/field_encoder/field_encoder_values.rs b/src/query/formats/src/field_encoder/field_encoder_values.rs index de98a755a785b..39a9426fc5850 100644 --- a/src/query/formats/src/field_encoder/field_encoder_values.rs +++ b/src/query/formats/src/field_encoder/field_encoder_values.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use chrono_tz::Tz; use common_datavalues::serializations::write_escaped_string; use common_datavalues::serializations::ArraySerializer; use common_datavalues::serializations::StructSerializer; @@ -44,6 +45,20 @@ impl FieldEncoderValues { quote_char: b'\'', } } + + pub fn create_for_handler(timezone: Tz) -> Self { + FieldEncoderValues { + common_settings: CommonSettings { + true_bytes: TRUE_BYTES_NUM.as_bytes().to_vec(), + false_bytes: FALSE_BYTES_NUM.as_bytes().to_vec(), + null_bytes: NULL_BYTES_UPPER.as_bytes().to_vec(), + nan_bytes: NAN_BYTES_LOWER.as_bytes().to_vec(), + inf_bytes: INF_BYTES_LOWER.as_bytes().to_vec(), + timezone, + }, + quote_char: b'\'', + } + } } impl FieldEncoderRowBased for FieldEncoderValues { diff --git a/src/query/service/Cargo.toml b/src/query/service/Cargo.toml index 0131999ac0003..8af23866a32aa 100644 --- a/src/query/service/Cargo.toml +++ b/src/query/service/Cargo.toml @@ -85,6 +85,7 @@ async-stream = "0.3.3" async-trait = { version = "0.1.57", package = "async-trait-fn" } backon = "0.2" base64 = "0.13.0" +bstr = "1.0.1" bumpalo = "3.11.0" byteorder = "1.4.3" bytes = "1.2.1" diff --git a/src/query/service/src/servers/http/v1/json_block.rs b/src/query/service/src/servers/http/v1/json_block.rs index 203b191a0f220..2c3e6a3090fe3 100644 --- a/src/query/service/src/servers/http/v1/json_block.rs +++ b/src/query/service/src/servers/http/v1/json_block.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use bstr::ByteSlice; use common_datablocks::DataBlock; use common_datavalues::DataSchema; use common_datavalues::DataSchemaRef; @@ -21,6 +22,8 @@ use common_datavalues::DataType; use common_datavalues::TypeSerializer; use common_exception::ErrorCode; use common_exception::Result; +use common_formats::field_encoder::FieldEncoderRowBased; +use common_formats::field_encoder::FieldEncoderValues; use common_io::prelude::FormatSettings; use serde_json::Value as JsonValue; @@ -88,11 +91,17 @@ fn block_to_json_value_string_fields( let rows_size = block.column(0).len(); let mut res = Vec::new(); let serializers = block.get_serializers()?; + let encoder = FieldEncoderValues::create_for_handler(format.timezone); + let mut buf = vec![]; for row_index in 0..rows_size { let mut row: Vec = Vec::with_capacity(block.num_columns()); for serializer in serializers.iter() { - let s = serializer.to_string_values(row_index, format)?; - row.push(serde_json::to_value(s)?) + buf.clear(); + encoder.write_field(serializer, row_index, &mut buf, true); + row.push(serde_json::to_value( + buf.to_str() + .map_err(|e| ErrorCode::BadBytes(format!("{}", e)))?, + )?); } res.push(row) } From edd84f0f2802e7dcff8bfae0447f72eb30dd000e Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Thu, 10 Nov 2022 15:49:48 +0800 Subject: [PATCH 2/5] refactor(mysql handler): use FieldEncoder to encode data. --- .../mysql/writers/query_result_writer.rs | 79 +++++++++++-------- 1 file changed, 48 insertions(+), 31 deletions(-) diff --git a/src/query/service/src/servers/mysql/writers/query_result_writer.rs b/src/query/service/src/servers/mysql/writers/query_result_writer.rs index 64ab5b0a824b8..dc05ffd6792b9 100644 --- a/src/query/service/src/servers/mysql/writers/query_result_writer.rs +++ b/src/query/service/src/servers/mysql/writers/query_result_writer.rs @@ -21,9 +21,10 @@ use common_datavalues::DataSchemaRef; use common_datavalues::DataType; use common_datavalues::DataValue; use common_datavalues::DateConverter; -use common_datavalues::TypeSerializer; use common_exception::ErrorCode; use common_exception::Result; +use common_formats::field_encoder::FieldEncoderRowBased; +use common_formats::field_encoder::FieldEncoderValues; use common_io::prelude::FormatSettings; use futures_util::StreamExt; use opensrv_mysql::*; @@ -186,6 +187,8 @@ impl<'a, W: AsyncWrite + Send + Unpin> DFQueryResultWriter<'a, W> { match block.get_serializers() { Ok(serializers) => { let rows_size = block.column(0).len(); + let encoder = FieldEncoderValues::create_for_handler(format.timezone); + let mut buf = Vec::::new(); for row_index in 0..rows_size { for (col_index, serializer) in serializers.iter().enumerate() { let val = block.column(col_index).get_checked(row_index)?; @@ -205,45 +208,59 @@ impl<'a, W: AsyncWrite + Send + Unpin> DFQueryResultWriter<'a, W> { let v = v as i32; row_writer.write_col(v.to_date(&tz).naive_local())? } - (TypeID::Timestamp, DataValue::Int64(_)) => row_writer - .write_col( - serializer.to_string_values(row_index, format)?, - )?, + (TypeID::Timestamp, DataValue::Int64(_)) => { + buf.clear(); + encoder + .write_field(serializer, row_index, &mut buf, true); + row_writer.write_col(&buf[..])?; + } (TypeID::String, DataValue::String(v)) => { row_writer.write_col(v)? } - (TypeID::Array, DataValue::Array(_)) => row_writer - .write_col( - serializer.to_string_values(row_index, format)?, - )?, - (TypeID::Struct, DataValue::Struct(_)) => row_writer - .write_col( - serializer.to_string_values(row_index, format)?, - )?, - (TypeID::Variant, DataValue::Variant(_)) => row_writer - .write_col( - serializer.to_string_values(row_index, format)?, - )?, - (TypeID::VariantArray, DataValue::Variant(_)) => row_writer - .write_col( - serializer.to_string_values(row_index, format)?, - )?, + (TypeID::Array, DataValue::Array(_)) => { + buf.clear(); + encoder + .write_field(serializer, row_index, &mut buf, true); + row_writer.write_col(&buf[..])?; + } + (TypeID::Struct, DataValue::Struct(_)) => { + buf.clear(); + encoder + .write_field(serializer, row_index, &mut buf, true); + row_writer.write_col(&buf[..])?; + } + (TypeID::Variant, DataValue::Variant(_)) => { + buf.clear(); + encoder + .write_field(serializer, row_index, &mut buf, true); + row_writer.write_col(&buf[..])?; + } + (TypeID::VariantArray, DataValue::Variant(_)) => { + buf.clear(); + encoder + .write_field(serializer, row_index, &mut buf, true); + row_writer.write_col(&buf[..])?; + } (TypeID::VariantObject, DataValue::Variant(_)) => { - row_writer.write_col( - serializer.to_string_values(row_index, format)?, - )? + buf.clear(); + encoder + .write_field(serializer, row_index, &mut buf, true); + row_writer.write_col(&buf[..])?; } (_, DataValue::Int64(v)) => row_writer.write_col(v)?, (_, DataValue::UInt64(v)) => row_writer.write_col(v)?, - (_, DataValue::Float64(_)) => row_writer - // mysql writer use a text protocol, - // it use format!() to serialize number, - // the result will be different with our serializer for floats - .write_col( - serializer.to_string_values(row_index, format)?, - )?, + (_, DataValue::Float64(_)) => + // mysql writer use a text protocol, + // it use format!() to serialize number, + // the result will be different with our serializer for floats + { + buf.clear(); + encoder + .write_field(serializer, row_index, &mut buf, true); + row_writer.write_col(&buf[..])?; + } (_, v) => { return Err(ErrorCode::BadDataValueType(format!( "Unsupported column type:{:?}, expected type in schema: {:?}", From b97bb0179bf7325f7ba05707bc391eff80836d07 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Thu, 10 Nov 2022 16:05:21 +0800 Subject: [PATCH 3/5] refactor(data values): rm unused function of TypeSerializers. --- src/query/datavalues/Cargo.toml | 4 - src/query/datavalues/benches/output_format.rs | 119 ------------- .../src/types/serializations/array.rs | 64 ------- .../src/types/serializations/boolean.rs | 49 ------ .../src/types/serializations/const_.rs | 34 ---- .../src/types/serializations/date.rs | 59 ------- .../src/types/serializations/mod.rs | 38 ---- .../src/types/serializations/null.rs | 38 ---- .../src/types/serializations/nullable.rs | 52 ------ .../src/types/serializations/number.rs | 34 ---- .../src/types/serializations/string.rs | 70 -------- .../src/types/serializations/struct_.rs | 64 ------- .../src/types/serializations/timestamp.rs | 59 ------- .../src/types/serializations/variant.rs | 50 ------ .../tests/it/types/serializations/mod.rs | 163 ------------------ 15 files changed, 897 deletions(-) delete mode 100644 src/query/datavalues/benches/output_format.rs diff --git a/src/query/datavalues/Cargo.toml b/src/query/datavalues/Cargo.toml index f31436635a5e4..b6a3ff7bb4f19 100644 --- a/src/query/datavalues/Cargo.toml +++ b/src/query/datavalues/Cargo.toml @@ -56,7 +56,3 @@ harness = false [[bench]] name = "data_type" harness = false - -[[bench]] -name = "output_format" -harness = false diff --git a/src/query/datavalues/benches/output_format.rs b/src/query/datavalues/benches/output_format.rs deleted file mode 100644 index 5f49199455dcf..0000000000000 --- a/src/query/datavalues/benches/output_format.rs +++ /dev/null @@ -1,119 +0,0 @@ -// Copyright 2022 Datafuse Labs. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use common_datavalues::ColumnRef; -use common_datavalues::DataType; -use common_datavalues::Series; -use common_datavalues::SeriesFrom; -use common_datavalues::TypeSerializer; -use common_io::prelude::FormatSettings; -use criterion::black_box; -use criterion::criterion_group; -use criterion::criterion_main; -use criterion::Criterion; -use rand::distributions::Alphanumeric; -use rand::rngs::StdRng; -use rand::Rng; -use rand::SeedableRng; - -pub fn write_csv(cols: &[ColumnRef]) -> Vec { - let mut buf = Vec::with_capacity(1000 * 1000); - let mut ss = vec![]; - let rows = cols[0].len(); - - for c in cols { - let t = c.data_type(); - ss.push(t.create_serializer(c).unwrap()) - } - let f = &FormatSettings::default(); - for row in 0..rows { - for s in &ss { - s.write_field_values(row, &mut buf, f, false); - } - } - buf -} - -fn add_benchmark(c: &mut Criterion) { - let mut cols = vec![]; - let size = 4096; - cols.push(create_primitive_array(size, None, 0)); - cols.push(create_string_array(size, None, 100)); - c.bench_function("not_nullable", |b| { - b.iter(|| write_csv(black_box(&cols))); - }); -} - -pub fn create_primitive_array( - size: usize, - null_density: Option, - _item_size: usize, -) -> ColumnRef { - let mut rng = StdRng::seed_from_u64(3); - match null_density { - None => { - let v = (0..size).map(|_| rng.gen()).collect::>(); - Series::from_data(v) - } - Some(null_density) => { - let v = (0..size) - .map(|_| { - if rng.gen::() < null_density { - None - } else { - Some(rng.gen()) - } - }) - .collect::>>(); - Series::from_data(v) - } - } -} -use std::string::String; -pub fn create_string_array(size: usize, null_density: Option, item_size: usize) -> ColumnRef { - let mut rng = StdRng::seed_from_u64(3); - match null_density { - None => { - let vec: Vec = (0..size) - .map(|_| { - (&mut rng) - .sample_iter(&Alphanumeric) - .take(item_size) - .map(char::from) - .collect::() - }) - .collect(); - Series::from_data(vec) - } - Some(null_density) => { - let vec: Vec<_> = (0..item_size) - .map(|_| { - if rng.gen::() < null_density { - None - } else { - let value = (&mut rng) - .sample_iter(&Alphanumeric) - .take(size) - .collect::>(); - Some(value) - } - }) - .collect(); - Series::from_data(vec) - } - } -} - -criterion_group!(benches, add_benchmark); -criterion_main!(benches); diff --git a/src/query/datavalues/src/types/serializations/array.rs b/src/query/datavalues/src/types/serializations/array.rs index 36044ef28db04..020d93a10eab9 100644 --- a/src/query/datavalues/src/types/serializations/array.rs +++ b/src/query/datavalues/src/types/serializations/array.rs @@ -17,8 +17,6 @@ use common_io::prelude::FormatSettings; use serde_json::Value; use crate::prelude::*; -use crate::serializations::write_csv_string; -use crate::serializations::write_json_string; #[derive(Clone)] pub struct ArraySerializer<'a> { @@ -38,68 +36,6 @@ impl<'a> ArraySerializer<'a> { } impl<'a> TypeSerializer<'a> for ArraySerializer<'a> { - fn write_field_values( - &self, - row_index: usize, - buf: &mut Vec, - format: &FormatSettings, - _in_nested: bool, - ) { - let start = self.offsets[row_index] as usize; - let end = self.offsets[row_index + 1] as usize; - buf.push(b'['); - let inner = &self.inner; - for i in start..end { - if i != start { - buf.extend_from_slice(b", "); - } - inner.write_field_values(i, buf, format, true); - } - buf.push(b']'); - } - - fn write_field_tsv( - &self, - row_index: usize, - buf: &mut Vec, - format: &FormatSettings, - _in_nested: bool, - ) { - let start = self.offsets[row_index] as usize; - let end = self.offsets[row_index + 1] as usize; - buf.push(b'['); - let inner = &self.inner; - for i in start..end { - if i != start { - buf.extend_from_slice(b", "); - } - inner.write_field_tsv(i, buf, format, true); - } - buf.push(b']'); - } - - fn write_field_csv(&self, row_index: usize, buf: &mut Vec, format: &FormatSettings) { - let v = self.to_vec_values(row_index, format); - write_csv_string(&v, buf, format.quote_char); - } - - fn write_field_json( - &self, - row_index: usize, - buf: &mut Vec, - format: &FormatSettings, - quote: bool, - ) { - let v = self.to_vec_values(row_index, format); - if quote { - buf.push(b'\"'); - } - write_json_string(&v, buf, format); - if quote { - buf.push(b'\"'); - } - } - fn serialize_json_values(&self, format: &FormatSettings) -> Result> { let size = self.offsets.len() - 1; let mut result = Vec::with_capacity(size); diff --git a/src/query/datavalues/src/types/serializations/boolean.rs b/src/query/datavalues/src/types/serializations/boolean.rs index eff68638bff42..37824220c812a 100644 --- a/src/query/datavalues/src/types/serializations/boolean.rs +++ b/src/query/datavalues/src/types/serializations/boolean.rs @@ -30,58 +30,9 @@ impl BooleanSerializer { let values = col.values().clone(); Ok(Self { values }) } - - #[inline] - fn write_field_outer(&self, row_index: usize, buf: &mut Vec, format: &FormatSettings) { - let v = if self.values.get_bit(row_index) { - &format.true_bytes - } else { - &format.false_bytes - }; - buf.extend_from_slice(v); - } } impl<'a> TypeSerializer<'a> for BooleanSerializer { - fn write_field_values( - &self, - row_index: usize, - buf: &mut Vec, - format: &FormatSettings, - _in_nested: bool, - ) { - let v = if self.values.get_bit(row_index) { - &format.nested.true_bytes - } else { - &format.nested.false_bytes - }; - buf.extend_from_slice(v); - } - - fn write_field_tsv( - &self, - row_index: usize, - buf: &mut Vec, - format: &FormatSettings, - _in_nested: bool, - ) { - self.write_field_outer(row_index, buf, format) - } - - fn write_field_csv(&self, row_index: usize, buf: &mut Vec, format: &FormatSettings) { - self.write_field_outer(row_index, buf, format) - } - - fn write_field_json( - &self, - row_index: usize, - buf: &mut Vec, - format: &FormatSettings, - _quote: bool, - ) { - self.write_field_outer(row_index, buf, format) - } - fn serialize_json_values(&self, _format: &FormatSettings) -> Result> { let result: Vec = self .values diff --git a/src/query/datavalues/src/types/serializations/const_.rs b/src/query/datavalues/src/types/serializations/const_.rs index a3133f7dbb9ec..5d14bc99d8fa1 100644 --- a/src/query/datavalues/src/types/serializations/const_.rs +++ b/src/query/datavalues/src/types/serializations/const_.rs @@ -35,40 +35,6 @@ impl<'a> ConstSerializer<'a> { } } impl<'a> TypeSerializer<'a> for ConstSerializer<'a> { - fn write_field_values( - &self, - _row_index: usize, - buf: &mut Vec, - format: &FormatSettings, - in_nested: bool, - ) { - self.inner.write_field_values(0, buf, format, in_nested) - } - - fn write_field_tsv( - &self, - _row_index: usize, - buf: &mut Vec, - format: &FormatSettings, - in_nested: bool, - ) { - self.inner.write_field_tsv(0, buf, format, in_nested) - } - - fn write_field_csv(&self, _row_index: usize, buf: &mut Vec, format: &FormatSettings) { - self.inner.write_field_csv(0, buf, format) - } - - fn write_field_json( - &self, - _row_index: usize, - buf: &mut Vec, - format: &FormatSettings, - quote: bool, - ) { - self.inner.write_field_json(0, buf, format, quote) - } - fn serialize_json_values(&self, format: &FormatSettings) -> Result> { Ok(self.repeat(self.inner.serialize_json_values(format)?)) } diff --git a/src/query/datavalues/src/types/serializations/date.rs b/src/query/datavalues/src/types/serializations/date.rs index 362b976fa70ba..cfbdf71a737e0 100644 --- a/src/query/datavalues/src/types/serializations/date.rs +++ b/src/query/datavalues/src/types/serializations/date.rs @@ -22,9 +22,6 @@ use lexical_core::ToLexical; use num::cast::AsPrimitive; use serde_json::Value; -use crate::serializations::write_csv_string; -use crate::serializations::write_escaped_string; -use crate::serializations::write_json_string; use crate::serializations::TypeSerializer; use crate::ColumnRef; use crate::PrimitiveColumn; @@ -61,62 +58,6 @@ impl<'a, T: PrimitiveType + AsPrimitive + ToLexical> DateSerializer<'a, T> impl<'a, T: PrimitiveType + AsPrimitive + ToLexical> TypeSerializer<'a> for DateSerializer<'a, T> { - fn write_field_values( - &self, - row_index: usize, - buf: &mut Vec, - format: &FormatSettings, - in_nested: bool, - ) { - let s = self.fmt(row_index); - if in_nested { - buf.push(format.nested.quote_char); - } - write_escaped_string(s.as_bytes(), buf, format.nested.quote_char); - if in_nested { - buf.push(format.nested.quote_char); - } - } - - fn write_field_tsv( - &self, - row_index: usize, - buf: &mut Vec, - format: &FormatSettings, - in_nested: bool, - ) { - if in_nested { - buf.push(format.quote_char); - } - let s = self.fmt(row_index); - write_escaped_string(s.as_bytes(), buf, format.quote_char); - if in_nested { - buf.push(format.quote_char); - } - } - - fn write_field_csv(&self, row_index: usize, buf: &mut Vec, format: &FormatSettings) { - let s = self.fmt(row_index); - write_csv_string(s.as_bytes(), buf, format.quote_char); - } - - fn write_field_json( - &self, - row_index: usize, - buf: &mut Vec, - format: &FormatSettings, - quote: bool, - ) { - let s = self.fmt(row_index); - if quote { - buf.push(b'\"'); - } - write_json_string(s.as_bytes(), buf, format); - if quote { - buf.push(b'\"'); - } - } - fn serialize_json_values(&self, _format: &FormatSettings) -> Result> { let result: Vec = (0..self.values.len()) .map(|row_index| { diff --git a/src/query/datavalues/src/types/serializations/mod.rs b/src/query/datavalues/src/types/serializations/mod.rs index d3d5e6a0e0f91..6f3612e9081a5 100644 --- a/src/query/datavalues/src/types/serializations/mod.rs +++ b/src/query/datavalues/src/types/serializations/mod.rs @@ -30,7 +30,6 @@ pub use boolean::BooleanSerializer; use common_arrow::arrow::bitmap::Bitmap; use common_exception::ErrorCode; use common_exception::Result; -use common_exception::ToErrorCode; use common_io::prelude::FormatSettings; pub use const_::ConstSerializer; pub use date::DateSerializer; @@ -49,43 +48,6 @@ pub use variant::VariantSerializer; #[enum_dispatch] pub trait TypeSerializer<'a>: Send + Sync { - // values and nested - fn write_field_values( - &self, - row_index: usize, - buf: &mut Vec, - format: &FormatSettings, - in_nested: bool, - ); - - fn to_vec_values(&self, row_index: usize, format: &FormatSettings) -> Vec { - let mut buf = Vec::with_capacity(64); - self.write_field_values(row_index, &mut buf, format, false); - buf - } - - fn to_string_values(&self, row_index: usize, format: &FormatSettings) -> Result { - let buf = self.to_vec_values(row_index, format); - String::from_utf8(buf).map_err_to_code(ErrorCode::BadBytes, || "fail to serialize field") - } - - // row based formats - fn write_field_tsv( - &self, - row_index: usize, - buf: &mut Vec, - format: &FormatSettings, - in_nested: bool, - ); - fn write_field_csv(&self, row_index: usize, buf: &mut Vec, format: &FormatSettings); - fn write_field_json( - &self, - row_index: usize, - buf: &mut Vec, - format: &FormatSettings, - quote: bool, - ); - // nested json fn serialize_json_values(&self, _format: &FormatSettings) -> Result>; diff --git a/src/query/datavalues/src/types/serializations/null.rs b/src/query/datavalues/src/types/serializations/null.rs index 0341456b08ebf..9781849bc8190 100644 --- a/src/query/datavalues/src/types/serializations/null.rs +++ b/src/query/datavalues/src/types/serializations/null.rs @@ -24,44 +24,6 @@ pub struct NullSerializer { } impl<'a> TypeSerializer<'a> for NullSerializer { - fn write_field_values( - &self, - _row_index: usize, - buf: &mut Vec, - format: &FormatSettings, - _in_nested: bool, - ) { - buf.extend_from_slice(&format.nested.null_bytes); - } - - fn write_field_tsv( - &self, - _row_index: usize, - buf: &mut Vec, - format: &FormatSettings, - _in_nested: bool, - ) { - buf.extend_from_slice(&format.null_bytes); - } - - fn write_field_csv(&self, _row_index: usize, buf: &mut Vec, format: &FormatSettings) { - buf.extend_from_slice(&format.null_bytes); - } - - fn write_field_json( - &self, - _row_index: usize, - buf: &mut Vec, - format: &FormatSettings, - _quote: bool, - ) { - buf.extend_from_slice(&format.null_bytes); - } - - fn to_string_values(&self, _row_index: usize, format: &FormatSettings) -> Result { - Ok(unsafe { String::from_utf8_unchecked(format.nested.null_bytes.clone()) }) - } - fn serialize_json_values(&self, _format: &FormatSettings) -> Result> { let null = Value::Null; let result: Vec = vec![null; self.size]; diff --git a/src/query/datavalues/src/types/serializations/nullable.rs b/src/query/datavalues/src/types/serializations/nullable.rs index 1201f53cb7183..119fbfd6f2534 100644 --- a/src/query/datavalues/src/types/serializations/nullable.rs +++ b/src/query/datavalues/src/types/serializations/nullable.rs @@ -27,58 +27,6 @@ pub struct NullableSerializer<'a> { } impl<'a> TypeSerializer<'a> for NullableSerializer<'a> { - fn write_field_values( - &self, - row_index: usize, - buf: &mut Vec, - format: &FormatSettings, - in_nested: bool, - ) { - if !self.validity.get_bit(row_index) { - buf.extend_from_slice(&format.nested.null_bytes); - } else { - self.inner - .write_field_values(row_index, buf, format, in_nested) - } - } - - fn write_field_tsv( - &self, - row_index: usize, - buf: &mut Vec, - format: &FormatSettings, - in_nested: bool, - ) { - if !self.validity.get_bit(row_index) { - buf.extend_from_slice(&format.null_bytes); - } else { - self.inner - .write_field_tsv(row_index, buf, format, in_nested) - } - } - - fn write_field_csv(&self, row_index: usize, buf: &mut Vec, format: &FormatSettings) { - if !self.validity.get_bit(row_index) { - buf.extend_from_slice(&format.null_bytes); - } else { - self.inner.write_field_csv(row_index, buf, format) - } - } - - fn write_field_json( - &self, - row_index: usize, - buf: &mut Vec, - format: &FormatSettings, - quote: bool, - ) { - if !self.validity.get_bit(row_index) { - buf.extend_from_slice(&format.null_bytes); - } else { - self.inner.write_field_json(row_index, buf, format, quote) - } - } - fn serialize_json_values(&self, format: &FormatSettings) -> Result> { let mut res = self.inner.serialize_json_values(format)?; let validity = self.validity; diff --git a/src/query/datavalues/src/types/serializations/number.rs b/src/query/datavalues/src/types/serializations/number.rs index 800c787b19d32..3908a3438385f 100644 --- a/src/query/datavalues/src/types/serializations/number.rs +++ b/src/query/datavalues/src/types/serializations/number.rs @@ -44,40 +44,6 @@ impl<'a, T: PrimitiveType> NumberSerializer<'a, T> { impl<'a, T> TypeSerializer<'a> for NumberSerializer<'a, T> where T: PrimitiveType + Marshal + Unmarshal + lexical_core::ToLexical + PrimitiveWithFormat { - fn write_field_values( - &self, - row_index: usize, - buf: &mut Vec, - format: &FormatSettings, - _in_nested: bool, - ) { - self.values[row_index].write_field(buf, format, true) - } - - fn write_field_tsv( - &self, - row_index: usize, - buf: &mut Vec, - format: &FormatSettings, - _in_nested: bool, - ) { - self.values[row_index].write_field(buf, format, true) - } - - fn write_field_csv(&self, row_index: usize, buf: &mut Vec, format: &FormatSettings) { - self.values[row_index].write_field(buf, format, false) - } - - fn write_field_json( - &self, - row_index: usize, - buf: &mut Vec, - format: &FormatSettings, - _quote: bool, - ) { - self.values[row_index].write_field(buf, format, false) - } - fn serialize_json_values(&self, _format: &FormatSettings) -> Result> { let result: Vec = self .values diff --git a/src/query/datavalues/src/types/serializations/string.rs b/src/query/datavalues/src/types/serializations/string.rs index 64c2f5209adf8..220bb8d7adefb 100644 --- a/src/query/datavalues/src/types/serializations/string.rs +++ b/src/query/datavalues/src/types/serializations/string.rs @@ -20,8 +20,6 @@ use serde_json::Value; pub use super::helper::json::write_json_string; use crate::prelude::*; -use crate::serializations::write_csv_string; -use crate::types::serializations::helper::escape::write_escaped_string; #[derive(Clone)] pub struct StringSerializer<'a> { @@ -36,74 +34,6 @@ impl<'a> StringSerializer<'a> { } impl<'a> TypeSerializer<'a> for StringSerializer<'a> { - fn write_field_values( - &self, - row_index: usize, - buf: &mut Vec, - format: &FormatSettings, - in_nested: bool, - ) { - if in_nested { - buf.push(format.nested.quote_char); - write_escaped_string( - unsafe { self.column.value_unchecked(row_index) }, - buf, - format.nested.quote_char, - ); - buf.push(format.nested.quote_char); - } else { - buf.extend_from_slice(unsafe { self.column.value_unchecked(row_index) }); - } - } - - fn write_field_tsv( - &self, - row_index: usize, - buf: &mut Vec, - format: &FormatSettings, - in_nested: bool, - ) { - if in_nested { - buf.push(format.quote_char); - }; - write_escaped_string( - unsafe { self.column.value_unchecked(row_index) }, - buf, - format.nested.quote_char, - ); - if in_nested { - buf.push(format.quote_char); - }; - } - - fn write_field_csv(&self, row_index: usize, buf: &mut Vec, format: &FormatSettings) { - write_csv_string( - unsafe { self.column.value_unchecked(row_index) }, - buf, - format.quote_char, - ); - } - - fn write_field_json( - &self, - row_index: usize, - buf: &mut Vec, - format: &FormatSettings, - quote: bool, - ) { - if quote { - buf.push(b'\"'); - } - write_json_string( - unsafe { self.column.value_unchecked(row_index) }, - buf, - format, - ); - if quote { - buf.push(b'\"'); - } - } - fn serialize_json_values(&self, _format: &FormatSettings) -> Result> { let result: Vec = self .column diff --git a/src/query/datavalues/src/types/serializations/struct_.rs b/src/query/datavalues/src/types/serializations/struct_.rs index e64a63dc2ede9..71eb60aeaf3de 100644 --- a/src/query/datavalues/src/types/serializations/struct_.rs +++ b/src/query/datavalues/src/types/serializations/struct_.rs @@ -17,8 +17,6 @@ use common_io::prelude::FormatSettings; use serde_json::Value; use crate::prelude::*; -use crate::serializations::write_csv_string; -use crate::serializations::write_json_string; #[derive(Clone)] pub struct StructSerializer<'a> { @@ -27,68 +25,6 @@ pub struct StructSerializer<'a> { } impl<'a> TypeSerializer<'a> for StructSerializer<'a> { - fn write_field_values( - &self, - row_index: usize, - buf: &mut Vec, - format: &FormatSettings, - _in_nested: bool, - ) { - buf.push(b'('); - let mut first = true; - - for inner in &self.inners { - if !first { - buf.extend_from_slice(b", "); - } - first = false; - inner.write_field_values(row_index, buf, format, true); - } - buf.push(b')'); - } - - fn write_field_tsv( - &self, - row_index: usize, - buf: &mut Vec, - format: &FormatSettings, - _in_nested: bool, - ) { - buf.push(b'('); - let mut first = true; - - for inner in &self.inners { - if !first { - buf.extend_from_slice(b", "); - } - first = false; - inner.write_field_tsv(row_index, buf, format, true); - } - buf.push(b')'); - } - - fn write_field_csv(&self, row_index: usize, buf: &mut Vec, format: &FormatSettings) { - let v = self.to_vec_values(row_index, format); - write_csv_string(&v, buf, format.quote_char); - } - - fn write_field_json( - &self, - row_index: usize, - buf: &mut Vec, - format: &FormatSettings, - quote: bool, - ) { - let v = self.to_vec_values(row_index, format); - if quote { - buf.push(b'\"'); - } - write_json_string(&v, buf, format); - if quote { - buf.push(b'\"'); - } - } - fn serialize_json_values(&self, _format: &FormatSettings) -> Result> { let column = self.column; let mut result = Vec::with_capacity(column.len()); diff --git a/src/query/datavalues/src/types/serializations/timestamp.rs b/src/query/datavalues/src/types/serializations/timestamp.rs index a1d6d05c17337..53d232dacfd8e 100644 --- a/src/query/datavalues/src/types/serializations/timestamp.rs +++ b/src/query/datavalues/src/types/serializations/timestamp.rs @@ -18,9 +18,6 @@ use common_exception::Result; use common_io::prelude::FormatSettings; use serde_json::Value; -use crate::serializations::write_csv_string; -use crate::serializations::write_escaped_string; -use crate::serializations::write_json_string; use crate::ColumnRef; use crate::DateConverter; use crate::PrimitiveColumn; @@ -54,62 +51,6 @@ impl<'a> TimestampSerializer<'a> { } impl<'a> TypeSerializer<'a> for TimestampSerializer<'a> { - fn write_field_values( - &self, - row_index: usize, - buf: &mut Vec, - format: &FormatSettings, - in_nested: bool, - ) { - let s = self.to_string_micro(row_index, &format.timezone); - if in_nested { - buf.push(format.nested.quote_char); - } - write_escaped_string(s.as_bytes(), buf, format.nested.quote_char); - if in_nested { - buf.push(format.nested.quote_char); - } - } - - fn write_field_tsv( - &self, - row_index: usize, - buf: &mut Vec, - format: &FormatSettings, - in_nested: bool, - ) { - let s = self.to_string_micro(row_index, &format.timezone); - if in_nested { - buf.push(format.quote_char); - } - write_escaped_string(s.as_bytes(), buf, format.quote_char); - if in_nested { - buf.push(format.quote_char); - } - } - - fn write_field_csv(&self, row_index: usize, buf: &mut Vec, format: &FormatSettings) { - let s = self.to_string_micro(row_index, &format.timezone); - write_csv_string(s.as_bytes(), buf, format.quote_char); - } - - fn write_field_json( - &self, - row_index: usize, - buf: &mut Vec, - format: &FormatSettings, - quote: bool, - ) { - let s = self.to_string_micro(row_index, &format.timezone); - if quote { - buf.push(b'\"'); - } - write_json_string(s.as_bytes(), buf, format); - if quote { - buf.push(b'\"'); - } - } - fn serialize_json_values(&self, format: &FormatSettings) -> Result> { let result: Vec = self .values diff --git a/src/query/datavalues/src/types/serializations/variant.rs b/src/query/datavalues/src/types/serializations/variant.rs index 0a6e3e747b280..fa3ad82cdf494 100644 --- a/src/query/datavalues/src/types/serializations/variant.rs +++ b/src/query/datavalues/src/types/serializations/variant.rs @@ -20,9 +20,6 @@ use serde_json; use serde_json::Value; use crate::prelude::*; -use crate::serializations::write_csv_string; -use crate::serializations::write_escaped_string; -use crate::serializations::write_json_string; #[derive(Debug, Clone)] pub struct VariantSerializer<'a> { @@ -38,53 +35,6 @@ impl<'a> VariantSerializer<'a> { } impl<'a> TypeSerializer<'a> for VariantSerializer<'a> { - fn write_field_values( - &self, - row_index: usize, - buf: &mut Vec, - _format: &FormatSettings, - _in_nested: bool, - ) { - buf.extend_from_slice(self.values[row_index].to_string().as_bytes()); - } - - fn to_string_values(&self, row_index: usize, _format: &FormatSettings) -> Result { - Ok(self.values[row_index].to_string()) - } - - fn write_field_csv(&self, row_index: usize, buf: &mut Vec, format: &FormatSettings) { - let v = self.to_vec_values(row_index, format); - write_csv_string(&v, buf, format.quote_char); - } - - fn write_field_tsv( - &self, - row_index: usize, - buf: &mut Vec, - format: &FormatSettings, - _in_nested: bool, - ) { - let v = self.to_vec_values(row_index, format); - write_escaped_string(&v, buf, format.quote_char); - } - - fn write_field_json( - &self, - row_index: usize, - buf: &mut Vec, - format: &FormatSettings, - quote: bool, - ) { - let v = self.to_vec_values(row_index, format); - if quote { - buf.push(b'\"'); - } - write_json_string(&v, buf, format); - if quote { - buf.push(b'\"'); - } - } - fn serialize_json_values(&self, _format: &FormatSettings) -> Result> { let result: Vec = self.values.iter().map(|v| v.as_ref().to_owned()).collect(); Ok(result) diff --git a/src/query/datavalues/tests/it/types/serializations/mod.rs b/src/query/datavalues/tests/it/types/serializations/mod.rs index b3e328b2f6da1..7d2790eb26a4d 100644 --- a/src/query/datavalues/tests/it/types/serializations/mod.rs +++ b/src/query/datavalues/tests/it/types/serializations/mod.rs @@ -12,176 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - use common_datavalues::prelude::*; use common_datavalues::serializations::NullSerializer; use common_exception::Result; -use common_io::prelude::FormatSettings; use pretty_assertions::assert_eq; -use serde_json::json; mod helpers; -#[test] -fn test_serializers() -> Result<()> { - struct Test { - name: &'static str, - data_type: DataTypeImpl, - column: ColumnRef, - val_str: &'static str, - col_str: Vec, - } - - let tests = vec![ - Test { - name: "boolean", - data_type: BooleanType::new_impl(), - column: Series::from_data(vec![true, false, true]), - val_str: "1", - col_str: vec!["1".to_owned(), "0".to_owned(), "1".to_owned()], - }, - Test { - name: "int8", - data_type: Int8Type::new_impl(), - column: Series::from_data(vec![1i8, 2i8, 1]), - val_str: "1", - col_str: vec!["1".to_owned(), "2".to_owned(), "1".to_owned()], - }, - Test { - name: "datetime", - data_type: TimestampType::new_impl(), - column: Series::from_data(vec![1630320462000000i64, 1637117572000000i64, 1000000]), - val_str: "2021-08-30 10:47:42.000000", - col_str: vec![ - "2021-08-30 10:47:42.000000".to_owned(), - "2021-11-17 02:52:52.000000".to_owned(), - "1970-01-01 00:00:01.000000".to_owned(), - ], - }, - Test { - name: "date32", - data_type: DateType::new_impl(), - column: Series::from_data(vec![18869i32, 18948i32, 1]), - val_str: "2021-08-30", - col_str: vec![ - "2021-08-30".to_owned(), - "2021-11-17".to_owned(), - "1970-01-02".to_owned(), - ], - }, - Test { - name: "string", - data_type: StringType::new_impl(), - column: Series::from_data(vec!["hello", "world", "NULL"]), - val_str: "hello", - col_str: vec!["hello".to_owned(), "world".to_owned(), "NULL".to_owned()], - }, - Test { - name: "array", - data_type: DataTypeImpl::Array(ArrayType::create(StringType::new_impl())), - column: Arc::new(ArrayColumn::from_data( - DataTypeImpl::Array(ArrayType::create(StringType::new_impl())), - vec![0, 1, 3, 6].into(), - Series::from_data(vec!["test", "data", "bend", "hello", "world", "NULL"]), - )), - val_str: "['test']", - col_str: vec![ - "['test']".to_owned(), - "['data', 'bend']".to_owned(), - "['hello', 'world', 'NULL']".to_owned(), - ], - }, - Test { - name: "struct", - data_type: DataTypeImpl::Struct(StructType::create( - Some(vec!["date".to_owned(), "integer".to_owned()]), - vec![DateType::new_impl(), Int8Type::new_impl()], - )), - column: Arc::new(StructColumn::from_data( - vec![ - Series::from_data(vec![18869i32, 18948i32, 1]), - Series::from_data(vec![1i8, 2i8, 3]), - ], - DataTypeImpl::Struct(StructType::create( - Some(vec!["date".to_owned(), "integer".to_owned()]), - vec![DateType::new_impl(), Int8Type::new_impl()], - )), - )), - val_str: "('2021-08-30', 1)", - col_str: vec![ - "('2021-08-30', 1)".to_owned(), - "('2021-11-17', 2)".to_owned(), - "('1970-01-02', 3)".to_owned(), - ], - }, - Test { - name: "variant", - data_type: VariantType::new_impl(), - column: Arc::new(VariantColumn::new_from_vec(vec![ - VariantValue::from(json!(null)), - VariantValue::from(json!(true)), - VariantValue::from(json!(false)), - VariantValue::from(json!(123)), - VariantValue::from(json!(12.34)), - ])), - val_str: "null", - col_str: vec![ - "null".to_owned(), - "true".to_owned(), - "false".to_owned(), - "123".to_owned(), - "12.34".to_owned(), - ], - }, - ]; - - let format = FormatSettings::default(); - for test in tests { - let serializer = test.data_type.create_serializer(&test.column)?; - let val_res = serializer.to_string_values(0, &format)?; - assert_eq!(&val_res, test.val_str, "case: {:#?}", test.name); - - let mut col_res = vec![]; - for i in 0..test.column.len() { - col_res.push(serializer.to_string_values(i, &format)?); - } - assert_eq!(col_res, test.col_str, "case: {:#?}", test.name); - } - - { - let data_type = StructType::create( - Some(vec![ - "item_1".to_owned(), - "item_2".to_owned(), - "item_3".to_owned(), - "item_4".to_owned(), - ]), - vec![ - Float64Type::new_impl(), - StringType::new_impl(), - BooleanType::new_impl(), - DateType::new_impl(), - ], - ); - let column: ColumnRef = Arc::new(StructColumn::from_data( - vec![ - Series::from_data(vec![1.2f64]), - Series::from_data(vec!["hello"]), - Series::from_data(vec![true]), - Series::from_data(vec![18869i32]), - ], - DataTypeImpl::Struct(data_type), - )); - let serializer = column.data_type().create_serializer(&column)?; - let result = serializer.to_string_values(0, &format)?; - let expect = "(1.2, 'hello', 1, '2021-08-30')"; - assert_eq!(&result, expect); - } - - Ok(()) -} - #[test] fn test_convert_arrow() { let t = TimestampType::new_impl(); From 63c40ebb060ae61f84386a9aba2dfa1ab917ccb5 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Thu, 10 Nov 2022 18:06:57 +0800 Subject: [PATCH 4/5] test(data values): update tests: remove ws after comma in array. --- .../01_load_v2/01_0004_streaming_parquet_int96.result | 6 +++--- .../01_load_v2/01_0004_streaming_variant_load.result | 8 ++++---- .../2_stateful_hive/00_basics/00_0000_hms_basics.result | 6 +++--- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/tests/suites/1_stateful/01_load_v2/01_0004_streaming_parquet_int96.result b/tests/suites/1_stateful/01_load_v2/01_0004_streaming_parquet_int96.result index cbeaaa3ae8175..d5f6b03bfaa9c 100755 --- a/tests/suites/1_stateful/01_load_v2/01_0004_streaming_parquet_int96.result +++ b/tests/suites/1_stateful/01_load_v2/01_0004_streaming_parquet_int96.result @@ -1,4 +1,4 @@ -100051134 1 1.1 2.2 2021-10-09 16:00:00.000000 2021-10-10 [6, 6, 6] +100051134 1 1.1 2.2 2021-10-09 16:00:00.000000 2021-10-10 [6,6,6] 100051135 NULL NULL NULL NULL NULL [10] -100051130 1 1.000003 10.000000000003 2021-09-20 16:00:00.000000 2021-09-21 [0, 0, 1, 2, 2, 3] -100051133 0 1.0 2.002 2021-09-21 16:00:00.000000 2021-09-22 [4, 3, 1, 2, 0, 0] +100051130 1 1.000003 10.000000000003 2021-09-20 16:00:00.000000 2021-09-21 [0,0,1,2,2,3] +100051133 0 1.0 2.002 2021-09-21 16:00:00.000000 2021-09-22 [4,3,1,2,0,0] diff --git a/tests/suites/1_stateful/01_load_v2/01_0004_streaming_variant_load.result b/tests/suites/1_stateful/01_load_v2/01_0004_streaming_variant_load.result index 93321b8ca7205..2374881bb817e 100644 --- a/tests/suites/1_stateful/01_load_v2/01_0004_streaming_variant_load.result +++ b/tests/suites/1_stateful/01_load_v2/01_0004_streaming_variant_load.result @@ -38,7 +38,7 @@ 38 {"k1":"v1","k2":"v2"} 39 {"k1":12,"k2":{"k3":"v3"}} 40 {"k1":12,"k2":["a","b","c"]} -1 1 1.1 ab 2020-01-01 2020-01-01 00:00:00.000000 [1, 2, 3] (0, 'a') {"k":"v"} -1 2 2.2 cd 2021-01-01 2021-01-01 00:00:00.000000 [4, 5, 6] (1, 'b') 123 -0 3 3.3 ef 2022-01-01 2022-01-01 00:00:00.000000 [7, 8, 9] (2, 'c') "xyz" -0 4 4.4 gh 2023-01-01 2023-01-01 00:00:00.000000 [10, 11] (3, 'd') [1,2] +1 1 1.1 ab 2020-01-01 2020-01-01 00:00:00.000000 [1,2,3] (0,'a') {"k":"v"} +1 2 2.2 cd 2021-01-01 2021-01-01 00:00:00.000000 [4,5,6] (1,'b') 123 +0 3 3.3 ef 2022-01-01 2022-01-01 00:00:00.000000 [7,8,9] (2,'c') "xyz" +0 4 4.4 gh 2023-01-01 2023-01-01 00:00:00.000000 [10,11] (3,'d') [1,2] diff --git a/tests/suites/2_stateful_hive/00_basics/00_0000_hms_basics.result b/tests/suites/2_stateful_hive/00_basics/00_0000_hms_basics.result index 5e104df8fbe99..b304f51d28e64 100644 --- a/tests/suites/2_stateful_hive/00_basics/00_0000_hms_basics.result +++ b/tests/suites/2_stateful_hive/00_basics/00_0000_hms_basics.result @@ -6,7 +6,7 @@ t_double DOUBLE YES NULL t_data DATE YES NULL t_array ARRAY(NULLABLE(INT32)) YES NULL t_array2 ARRAY(NULLABLE(STRING)) YES NULL -100051130 1 1.000003 10.000000000003 2021-09-21 [0, 0, 1, 2, 2, 3] ['hello', 'world'] -100051133 0 1.0 2.002 2021-09-22 [4, 3, 1, 2, 0, 0] ['a', 'b'] -100051134 1 1.1 2.2 2021-10-10 [6, NULL, 6] ['cc', 'aa', '', 'bb'] +100051130 1 1.000003 10.000000000003 2021-09-21 [0,0,1,2,2,3] ['hello','world'] +100051133 0 1.0 2.002 2021-09-22 [4,3,1,2,0,0] ['a','b'] +100051134 1 1.1 2.2 2021-10-10 [6,NULL,6] ['cc','aa','','bb'] 100051135 NULL NULL NULL NULL [] [] From 8833e62d719d5b5aa2f9763b1fb96111f03bb43f Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Thu, 10 Nov 2022 19:02:55 +0800 Subject: [PATCH 5/5] refactor(mysql handler): extract func write_field. --- .../mysql/writers/query_result_writer.rs | 88 ++++++++++++------- 1 file changed, 56 insertions(+), 32 deletions(-) diff --git a/src/query/service/src/servers/mysql/writers/query_result_writer.rs b/src/query/service/src/servers/mysql/writers/query_result_writer.rs index dc05ffd6792b9..19d2e9a57b8d1 100644 --- a/src/query/service/src/servers/mysql/writers/query_result_writer.rs +++ b/src/query/service/src/servers/mysql/writers/query_result_writer.rs @@ -21,6 +21,7 @@ use common_datavalues::DataSchemaRef; use common_datavalues::DataType; use common_datavalues::DataValue; use common_datavalues::DateConverter; +use common_datavalues::TypeSerializerImpl; use common_exception::ErrorCode; use common_exception::Result; use common_formats::field_encoder::FieldEncoderRowBased; @@ -69,6 +70,19 @@ pub struct DFQueryResultWriter<'a, W: AsyncWrite + Send + Unpin> { inner: Option>, } +fn write_field<'a, 'b, W: AsyncWrite + Unpin>( + row_writer: &mut RowWriter<'b, W>, + serializer: &TypeSerializerImpl<'a>, + encoder: &FieldEncoderValues, + buf: &mut Vec, + row_index: usize, +) -> Result<()> { + buf.clear(); + encoder.write_field(serializer, row_index, buf, true); + row_writer.write_col(&buf[..])?; + Ok(()) +} + impl<'a, W: AsyncWrite + Send + Unpin> DFQueryResultWriter<'a, W> { pub fn create(inner: QueryResultWriter<'a, W>) -> DFQueryResultWriter<'a, W> { DFQueryResultWriter::<'a, W> { inner: Some(inner) } @@ -208,44 +222,54 @@ impl<'a, W: AsyncWrite + Send + Unpin> DFQueryResultWriter<'a, W> { let v = v as i32; row_writer.write_col(v.to_date(&tz).naive_local())? } - (TypeID::Timestamp, DataValue::Int64(_)) => { - buf.clear(); - encoder - .write_field(serializer, row_index, &mut buf, true); - row_writer.write_col(&buf[..])?; - } + (TypeID::Timestamp, DataValue::Int64(_)) => write_field( + &mut row_writer, + serializer, + &encoder, + &mut buf, + row_index, + )?, (TypeID::String, DataValue::String(v)) => { row_writer.write_col(v)? } - (TypeID::Array, DataValue::Array(_)) => { - buf.clear(); - encoder - .write_field(serializer, row_index, &mut buf, true); - row_writer.write_col(&buf[..])?; - } - (TypeID::Struct, DataValue::Struct(_)) => { - buf.clear(); - encoder - .write_field(serializer, row_index, &mut buf, true); - row_writer.write_col(&buf[..])?; - } - (TypeID::Variant, DataValue::Variant(_)) => { - buf.clear(); - encoder - .write_field(serializer, row_index, &mut buf, true); - row_writer.write_col(&buf[..])?; - } + (TypeID::Array, DataValue::Array(_)) => write_field( + &mut row_writer, + serializer, + &encoder, + &mut buf, + row_index, + )?, + (TypeID::Struct, DataValue::Struct(_)) => write_field( + &mut row_writer, + serializer, + &encoder, + &mut buf, + row_index, + )?, + (TypeID::Variant, DataValue::Variant(_)) => write_field( + &mut row_writer, + serializer, + &encoder, + &mut buf, + row_index, + )?, (TypeID::VariantArray, DataValue::Variant(_)) => { - buf.clear(); - encoder - .write_field(serializer, row_index, &mut buf, true); - row_writer.write_col(&buf[..])?; + write_field( + &mut row_writer, + serializer, + &encoder, + &mut buf, + row_index, + )? } (TypeID::VariantObject, DataValue::Variant(_)) => { - buf.clear(); - encoder - .write_field(serializer, row_index, &mut buf, true); - row_writer.write_col(&buf[..])?; + write_field( + &mut row_writer, + serializer, + &encoder, + &mut buf, + row_index, + )? } (_, DataValue::Int64(v)) => row_writer.write_col(v)?,