From 2919b2516310e0ebc99959143cf0a0927e6f3350 Mon Sep 17 00:00:00 2001 From: zhya Date: Thu, 28 Nov 2024 13:36:48 +0800 Subject: [PATCH 1/7] chore(storage): do compact before recluster during compact hook (#16949) * chore: do compact before recluster during compact hook * update --------- Co-authored-by: dantengsky --- .../src/interpreters/hook/compact_hook.rs | 39 ++++++++++--------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/src/query/service/src/interpreters/hook/compact_hook.rs b/src/query/service/src/interpreters/hook/compact_hook.rs index 48b416c929fa..8f2962621421 100644 --- a/src/query/service/src/interpreters/hook/compact_hook.rs +++ b/src/query/service/src/interpreters/hook/compact_hook.rs @@ -147,9 +147,6 @@ async fn compact_table( .await?; let settings = ctx.get_settings(); - let do_recluster = !table.cluster_keys(ctx.clone()).is_empty(); - let do_compact = compaction_limits.block_limit.is_some() || !do_recluster; - // evict the table from cache ctx.evict_table_from_cache( &compact_target.catalog, @@ -157,7 +154,8 @@ async fn compact_table( &compact_target.table, )?; - if do_compact { + { + // do compact. let compact_block = RelOperator::CompactBlock(OptimizeCompactBlock { catalog: compact_target.catalog.clone(), database: compact_target.database.clone(), @@ -191,21 +189,24 @@ async fn compact_table( } } - if do_recluster { - let recluster = RelOperator::Recluster(Recluster { - catalog: compact_target.catalog, - database: compact_target.database, - table: compact_target.table, - filters: None, - limit: Some(settings.get_auto_compaction_segments_limit()? as usize), - }); - let s_expr = SExpr::create_leaf(Arc::new(recluster)); - let recluster_interpreter = - ReclusterTableInterpreter::try_create(ctx.clone(), s_expr, lock_opt, false)?; - // Recluster will be done in `ReclusterTableInterpreter::execute2` directly, - // we do not need to use `PipelineCompleteExecutor` to execute it. - let build_res = recluster_interpreter.execute2().await?; - assert!(build_res.main_pipeline.is_empty()); + { + // do recluster. + if !table.cluster_keys(ctx.clone()).is_empty() { + let recluster = RelOperator::Recluster(Recluster { + catalog: compact_target.catalog, + database: compact_target.database, + table: compact_target.table, + filters: None, + limit: Some(settings.get_auto_compaction_segments_limit()? as usize), + }); + let s_expr = SExpr::create_leaf(Arc::new(recluster)); + let recluster_interpreter = + ReclusterTableInterpreter::try_create(ctx.clone(), s_expr, lock_opt, false)?; + // Recluster will be done in `ReclusterTableInterpreter::execute2` directly, + // we do not need to use `PipelineCompleteExecutor` to execute it. + let build_res = recluster_interpreter.execute2().await?; + assert!(build_res.main_pipeline.is_empty()); + } } Ok(()) From 5b4e61a4a9bc1c646f3352521aeff4471249972d Mon Sep 17 00:00:00 2001 From: everpcpc Date: Thu, 28 Nov 2024 17:18:31 +0800 Subject: [PATCH 2/7] chore(ci): upgrade bendsql (#16965) --- .github/actions/setup_bendsql/action.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/actions/setup_bendsql/action.yml b/.github/actions/setup_bendsql/action.yml index 79718f8744d9..6a643ded14ca 100644 --- a/.github/actions/setup_bendsql/action.yml +++ b/.github/actions/setup_bendsql/action.yml @@ -10,7 +10,7 @@ runs: if bendsql --version; then exit 0 fi - curl --retry 5 -Lo /tmp/bendsql.tar.gz https://github.com/databendlabs/bendsql/releases/download/v0.18.3/bendsql-x86_64-unknown-linux-gnu.tar.gz + curl --retry 5 -Lo /tmp/bendsql.tar.gz https://github.com/databendlabs/bendsql/releases/download/v0.23.2/bendsql-x86_64-unknown-linux-gnu.tar.gz tar -xzf /tmp/bendsql.tar.gz -C /tmp mv /tmp/bendsql /usr/local/bin/bendsql bendsql --version @@ -21,7 +21,7 @@ runs: if bendsql --version; then exit 0 fi - curl --retry 5 -Lo /tmp/bendsql.tar.gz https://github.com/databendlabs/bendsql/releases/download/v0.18.3/bendsql-x86_64-apple-darwin.tar.gz + curl --retry 5 -Lo /tmp/bendsql.tar.gz https://github.com/databendlabs/bendsql/releases/download/v0.23.2/bendsql-x86_64-apple-darwin.tar.gz tar -xzf /tmp/bendsql.tar.gz -C /tmp mv /tmp/bendsql /usr/local/bin/bendsql bendsql --version From fb89467f243908ad5e1566a89a826e0aa180111d Mon Sep 17 00:00:00 2001 From: baishen Date: Thu, 28 Nov 2024 17:43:43 +0800 Subject: [PATCH 3/7] feat(query): Support geometry relation functions (#16927) * feat(query): Support geometry relation functions * fix * fix --- src/query/functions/src/scalars/geometry.rs | 206 ++++++++++++++---- .../functions/tests/it/scalars/geometry.rs | 81 +++++++ .../it/scalars/testdata/function_list.txt | 8 + .../tests/it/scalars/testdata/geometry.txt | 117 ++++++++++ .../functions/02_0060_function_geometry.test | 38 ++++ 5 files changed, 404 insertions(+), 46 deletions(-) diff --git a/src/query/functions/src/scalars/geometry.rs b/src/query/functions/src/scalars/geometry.rs index c35cc7c59e5e..0a0998d95cb8 100644 --- a/src/query/functions/src/scalars/geometry.rs +++ b/src/query/functions/src/scalars/geometry.rs @@ -28,6 +28,7 @@ use databend_common_expression::vectorize_with_builder_1_arg; use databend_common_expression::vectorize_with_builder_2_arg; use databend_common_expression::vectorize_with_builder_3_arg; use databend_common_expression::vectorize_with_builder_4_arg; +use databend_common_expression::EvalContext; use databend_common_expression::FunctionDomain; use databend_common_expression::FunctionRegistry; use databend_common_io::ewkb_to_geo; @@ -53,6 +54,7 @@ use geo::EuclideanLength; use geo::Geometry; use geo::HasDimensions; use geo::HaversineDistance; +use geo::Intersects; use geo::Line; use geo::LineString; use geo::MultiLineString; @@ -63,6 +65,7 @@ use geo::Rect; use geo::ToDegrees; use geo::ToRadians; use geo::Triangle; +use geo::Within; use geohash::decode_bbox; use geohash::encode; use geozero::geojson::GeoJson; @@ -255,29 +258,144 @@ pub fn register(registry: &mut FunctionRegistry) { ewkb_to_geo(&mut Ewkb(r_ewkb)), ) { (Ok((l_geo, l_srid)), Ok((r_geo, r_srid))) => { - if !l_srid.eq(&r_srid) { - ctx.set_error( - builder.len(), - format!( - "Incompatible SRID: {} and {}", - l_srid.unwrap_or_default(), - r_srid.unwrap_or_default() - ), - ); + if !check_incompatible_srid(l_srid, r_srid, builder.len(), ctx) { builder.push(false); return; } - if matches!(l_geo, Geometry::GeometryCollection(_)) - || matches!(r_geo, Geometry::GeometryCollection(_)) - { - ctx.set_error( - builder.len(), - "A GEOMETRY object that is a GeometryCollection".to_string(), - ); + let is_contains = l_geo.contains(&r_geo); + builder.push(is_contains); + } + (Err(e), _) | (_, Err(e)) => { + ctx.set_error(builder.len(), e.to_string()); + builder.push(false); + } + } + }, + ), + ); + + registry.register_passthrough_nullable_2_arg::( + "st_intersects", + |_, _, _| FunctionDomain::MayThrow, + vectorize_with_builder_2_arg::( + |l_ewkb, r_ewkb, builder, ctx| { + if let Some(validity) = &ctx.validity { + if !validity.get_bit(builder.len()) { + builder.push(false); + return; + } + } + + match ( + ewkb_to_geo(&mut Ewkb(l_ewkb)), + ewkb_to_geo(&mut Ewkb(r_ewkb)), + ) { + (Ok((l_geo, l_srid)), Ok((r_geo, r_srid))) => { + if !check_incompatible_srid(l_srid, r_srid, builder.len(), ctx) { builder.push(false); - } else { - builder.push(l_geo.contains(&r_geo)); + return; + } + let is_intersects = l_geo.intersects(&r_geo); + builder.push(is_intersects); + } + (Err(e), _) | (_, Err(e)) => { + ctx.set_error(builder.len(), e.to_string()); + builder.push(false); + } + } + }, + ), + ); + + registry.register_passthrough_nullable_2_arg::( + "st_disjoint", + |_, _, _| FunctionDomain::MayThrow, + vectorize_with_builder_2_arg::( + |l_ewkb, r_ewkb, builder, ctx| { + if let Some(validity) = &ctx.validity { + if !validity.get_bit(builder.len()) { + builder.push(false); + return; + } + } + + match ( + ewkb_to_geo(&mut Ewkb(l_ewkb)), + ewkb_to_geo(&mut Ewkb(r_ewkb)), + ) { + (Ok((l_geo, l_srid)), Ok((r_geo, r_srid))) => { + if !check_incompatible_srid(l_srid, r_srid, builder.len(), ctx) { + builder.push(false); + return; + } + let is_disjoint = !l_geo.intersects(&r_geo); + builder.push(is_disjoint); + } + (Err(e), _) | (_, Err(e)) => { + ctx.set_error(builder.len(), e.to_string()); + builder.push(false); + } + } + }, + ), + ); + + registry.register_passthrough_nullable_2_arg::( + "st_within", + |_, _, _| FunctionDomain::MayThrow, + vectorize_with_builder_2_arg::( + |l_ewkb, r_ewkb, builder, ctx| { + if let Some(validity) = &ctx.validity { + if !validity.get_bit(builder.len()) { + builder.push(false); + return; + } + } + + match ( + ewkb_to_geo(&mut Ewkb(l_ewkb)), + ewkb_to_geo(&mut Ewkb(r_ewkb)), + ) { + (Ok((l_geo, l_srid)), Ok((r_geo, r_srid))) => { + if !check_incompatible_srid(l_srid, r_srid, builder.len(), ctx) { + builder.push(false); + return; } + let is_within = l_geo.is_within(&r_geo); + builder.push(is_within); + } + (Err(e), _) | (_, Err(e)) => { + ctx.set_error(builder.len(), e.to_string()); + builder.push(false); + } + } + }, + ), + ); + + registry.register_passthrough_nullable_2_arg::( + "st_equals", + |_, _, _| FunctionDomain::MayThrow, + vectorize_with_builder_2_arg::( + |l_ewkb, r_ewkb, builder, ctx| { + if let Some(validity) = &ctx.validity { + if !validity.get_bit(builder.len()) { + builder.push(false); + return; + } + } + + match ( + ewkb_to_geo(&mut Ewkb(l_ewkb)), + ewkb_to_geo(&mut Ewkb(r_ewkb)), + ) { + (Ok((l_geo, l_srid)), Ok((r_geo, r_srid))) => { + if !check_incompatible_srid(l_srid, r_srid, builder.len(), ctx) { + builder.push(false); + return; + } + let is_equal = l_geo.is_within(&r_geo) && r_geo.is_within(&l_geo); + builder.push(is_equal); } (Err(e), _) | (_, Err(e)) => { ctx.set_error(builder.len(), e.to_string()); @@ -306,32 +424,11 @@ pub fn register(registry: &mut FunctionRegistry) { ewkb_to_geo(&mut Ewkb(r_ewkb)), ) { (Ok((l_geo, l_srid)), Ok((r_geo, r_srid))) => { - if !l_srid.eq(&r_srid) { - ctx.set_error( - builder.len(), - format!( - "Incompatible SRID: {} and {}", - l_srid.unwrap_or_default(), - r_srid.unwrap_or_default() - ), - ); + if !check_incompatible_srid(l_srid, r_srid, builder.len(), ctx) { builder.push(F64::from(0_f64)); return; } - - let distance = match l_geo { - Geometry::Point(l) => l.euclidean_distance(&r_geo), - Geometry::Line(l) => l.euclidean_distance(&r_geo), - Geometry::LineString(l) => l.euclidean_distance(&r_geo), - Geometry::Polygon(l) => l.euclidean_distance(&r_geo), - Geometry::MultiPoint(l) => l.euclidean_distance(&r_geo), - Geometry::MultiLineString(l) => l.euclidean_distance(&r_geo), - Geometry::MultiPolygon(l) => l.euclidean_distance(&r_geo), - Geometry::GeometryCollection(l) => l.euclidean_distance(&r_geo), - Geometry::Rect(l) => l.euclidean_distance(&r_geo), - Geometry::Triangle(l) => l.euclidean_distance(&r_geo), - }; - + let distance = l_geo.euclidean_distance(&r_geo); let distance = (distance * 1_000_000_000_f64).round() / 1_000_000_000_f64; builder.push(distance.into()); @@ -656,11 +753,7 @@ pub fn register(registry: &mut FunctionRegistry) { match (ewkb_to_geo(&mut Ewkb(l_ewkb)), ewkb_to_geo(&mut Ewkb(r_ewkb))) { (Ok((l_geo, l_srid)), Ok((r_geo, r_srid))) => { - if !l_srid.eq(&r_srid) { - ctx.set_error( - builder.len(), - format!("Incompatible SRID: {} and {}", l_srid.unwrap_or_default(), r_srid.unwrap_or_default()) - ); + if !check_incompatible_srid(l_srid, r_srid, builder.len(), ctx) { builder.commit_row(); return; } @@ -1887,3 +1980,24 @@ fn round_geometry_coordinates(geom: Geometry) -> Geometry { )), } } + +fn check_incompatible_srid( + l_srid: Option, + r_srid: Option, + len: usize, + ctx: &mut EvalContext, +) -> bool { + if !l_srid.eq(&r_srid) { + ctx.set_error( + len, + format!( + "Incompatible SRID: {} and {}", + l_srid.unwrap_or_default(), + r_srid.unwrap_or_default() + ), + ); + false + } else { + true + } +} diff --git a/src/query/functions/tests/it/scalars/geometry.rs b/src/query/functions/tests/it/scalars/geometry.rs index 8572dde9e86d..372fd79937d6 100644 --- a/src/query/functions/tests/it/scalars/geometry.rs +++ b/src/query/functions/tests/it/scalars/geometry.rs @@ -60,6 +60,10 @@ fn test_geometry() { test_st_ymax(file); test_st_ymin(file); test_st_transform(file); + test_st_intersects(file); + test_st_disjoint(file); + test_st_within(file); + test_st_equals(file); } fn test_haversine(file: &mut impl Write) { @@ -679,3 +683,80 @@ fn test_st_transform(file: &mut impl Write) { ("c", Int32Type::from_data(vec![28992])), ]); } + +fn test_st_intersects(file: &mut impl Write) { + run_ast( + file, + "ST_INTERSECTS(TO_GEOMETRY('POINT(0 0)'), TO_GEOMETRY('LINESTRING(2 0, 0 2)'))", + &[], + ); + run_ast( + file, + "ST_INTERSECTS(TO_GEOMETRY('POINT(0 0)'), TO_GEOMETRY('LINESTRING(0 0, 0 2)'))", + &[], + ); + run_ast( + file, + "ST_INTERSECTS(TO_GEOMETRY('POLYGON((0 0, 0 2, 2 2, 2 0, 0 0))'), TO_GEOMETRY('POLYGON((1 1, 3 1, 3 3, 1 3, 1 1))'))", + &[], + ); +} + +fn test_st_disjoint(file: &mut impl Write) { + run_ast( + file, + "ST_DISJOINT(TO_GEOMETRY('POINT(0 0)'), TO_GEOMETRY('LINESTRING(2 0, 0 2)'))", + &[], + ); + run_ast( + file, + "ST_DISJOINT(TO_GEOMETRY('POINT(0 0)'), TO_GEOMETRY('LINESTRING(0 0, 0 2)'))", + &[], + ); + run_ast( + file, + "ST_DISJOINT(TO_GEOMETRY('POLYGON((0 0, 0 2, 2 2, 2 0, 0 0))'), TO_GEOMETRY('POLYGON((1 1, 3 1, 3 3, 1 3, 1 1))'))", + &[], + ); +} + +fn test_st_within(file: &mut impl Write) { + run_ast( + file, + "ST_WITHIN(TO_GEOMETRY('POINT(1 2)'), TO_GEOMETRY('LINESTRING(0 0, 2 4)'))", + &[], + ); + run_ast( + file, + "ST_WITHIN(TO_GEOMETRY('POINT(10 20)'), TO_GEOMETRY('POLYGON((0 0, 0 40, 40 40, 40 0, 0 0))'))", + &[], + ); + run_ast( + file, + "ST_WITHIN(TO_GEOMETRY('POLYGON((0 0, 0 40, 40 40, 40 0, 0 0))'), TO_GEOMETRY('POINT(10 20)'))", + &[], + ); +} + +fn test_st_equals(file: &mut impl Write) { + run_ast( + file, + "ST_EQUALS(TO_GEOMETRY('LINESTRING(0 0, 10 10)'), TO_GEOMETRY('LINESTRING(0 0, 5 5, 10 10)'))", + &[], + ); + run_ast( + file, + "ST_EQUALS(TO_GEOMETRY('LINESTRING(0 0, 5 5, 10 10)'), TO_GEOMETRY('LINESTRING(0 0, 10 10)'))", + &[], + ); + run_ast( + file, + "ST_EQUALS(TO_GEOMETRY('LINESTRING(10 10, 1 2)'), TO_GEOMETRY('LINESTRING(0 0, 5 5, 10 10)'))", + &[], + ); + run_ast( + file, + "ST_EQUALS(TO_GEOMETRY('POINT(10 10)'), TO_GEOMETRY('LINESTRING(10 10, 10 10)'))", + &[], + ); +} diff --git a/src/query/functions/tests/it/scalars/testdata/function_list.txt b/src/query/functions/tests/it/scalars/testdata/function_list.txt index afc725e13450..11a0695c73cc 100644 --- a/src/query/functions/tests/it/scalars/testdata/function_list.txt +++ b/src/query/functions/tests/it/scalars/testdata/function_list.txt @@ -3639,10 +3639,14 @@ Functions overloads: 1 st_contains(Geometry NULL, Geometry NULL) :: Boolean NULL 0 st_dimension(Geometry) :: Int32 NULL 1 st_dimension(Geometry NULL) :: Int32 NULL +0 st_disjoint(Geometry, Geometry) :: Boolean +1 st_disjoint(Geometry NULL, Geometry NULL) :: Boolean NULL 0 st_distance(Geometry, Geometry) :: Float64 1 st_distance(Geometry NULL, Geometry NULL) :: Float64 NULL 0 st_endpoint(Geometry) :: Geometry NULL 1 st_endpoint(Geometry NULL) :: Geometry NULL +0 st_equals(Geometry, Geometry) :: Boolean +1 st_equals(Geometry NULL, Geometry NULL) :: Boolean NULL 0 st_geographyfromewkt(String) :: Geography 1 st_geographyfromewkt(String NULL) :: Geography NULL 0 st_geohash(Geometry) :: String @@ -3665,6 +3669,8 @@ Functions overloads: 1 st_geomfromgeohash(String NULL) :: Geometry NULL 0 st_geompointfromgeohash(String) :: Geometry 1 st_geompointfromgeohash(String NULL) :: Geometry NULL +0 st_intersects(Geometry, Geometry) :: Boolean +1 st_intersects(Geometry NULL, Geometry NULL) :: Boolean NULL 0 st_length(Geometry) :: Float64 1 st_length(Geometry NULL) :: Float64 NULL 0 st_makegeompoint(Float64, Float64) :: Geometry @@ -3689,6 +3695,8 @@ Functions overloads: 1 st_transform(Geometry NULL, Int32 NULL) :: Geometry NULL 2 st_transform(Geometry, Int32, Int32) :: Geometry 3 st_transform(Geometry NULL, Int32 NULL, Int32 NULL) :: Geometry NULL +0 st_within(Geometry, Geometry) :: Boolean +1 st_within(Geometry NULL, Geometry NULL) :: Boolean NULL 0 st_x(Geometry) :: Float64 1 st_x(Geometry NULL) :: Float64 NULL 0 st_xmax(Geometry) :: Float64 NULL diff --git a/src/query/functions/tests/it/scalars/testdata/geometry.txt b/src/query/functions/tests/it/scalars/testdata/geometry.txt index 89cb96d0c9f0..03ec23e22726 100644 --- a/src/query/functions/tests/it/scalars/testdata/geometry.txt +++ b/src/query/functions/tests/it/scalars/testdata/geometry.txt @@ -1253,3 +1253,120 @@ evaluation (internal): +--------+-----------------------------------------------------------------------------------------------+ +ast : ST_INTERSECTS(TO_GEOMETRY('POINT(0 0)'), TO_GEOMETRY('LINESTRING(2 0, 0 2)')) +raw expr : ST_INTERSECTS(TO_GEOMETRY('POINT(0 0)'), TO_GEOMETRY('LINESTRING(2 0, 0 2)')) +checked expr : st_intersects(to_geometry("POINT(0 0)"), to_geometry("LINESTRING(2 0, 0 2)")) +optimized expr : false +output type : Boolean +output domain : {FALSE} +output : false + + +ast : ST_INTERSECTS(TO_GEOMETRY('POINT(0 0)'), TO_GEOMETRY('LINESTRING(0 0, 0 2)')) +raw expr : ST_INTERSECTS(TO_GEOMETRY('POINT(0 0)'), TO_GEOMETRY('LINESTRING(0 0, 0 2)')) +checked expr : st_intersects(to_geometry("POINT(0 0)"), to_geometry("LINESTRING(0 0, 0 2)")) +optimized expr : true +output type : Boolean +output domain : {TRUE} +output : true + + +ast : ST_INTERSECTS(TO_GEOMETRY('POLYGON((0 0, 0 2, 2 2, 2 0, 0 0))'), TO_GEOMETRY('POLYGON((1 1, 3 1, 3 3, 1 3, 1 1))')) +raw expr : ST_INTERSECTS(TO_GEOMETRY('POLYGON((0 0, 0 2, 2 2, 2 0, 0 0))'), TO_GEOMETRY('POLYGON((1 1, 3 1, 3 3, 1 3, 1 1))')) +checked expr : st_intersects(to_geometry("POLYGON((0 0, 0 2, 2 2, 2 0, 0 0))"), to_geometry("POLYGON((1 1, 3 1, 3 3, 1 3, 1 1))")) +optimized expr : true +output type : Boolean +output domain : {TRUE} +output : true + + +ast : ST_DISJOINT(TO_GEOMETRY('POINT(0 0)'), TO_GEOMETRY('LINESTRING(2 0, 0 2)')) +raw expr : ST_DISJOINT(TO_GEOMETRY('POINT(0 0)'), TO_GEOMETRY('LINESTRING(2 0, 0 2)')) +checked expr : st_disjoint(to_geometry("POINT(0 0)"), to_geometry("LINESTRING(2 0, 0 2)")) +optimized expr : true +output type : Boolean +output domain : {TRUE} +output : true + + +ast : ST_DISJOINT(TO_GEOMETRY('POINT(0 0)'), TO_GEOMETRY('LINESTRING(0 0, 0 2)')) +raw expr : ST_DISJOINT(TO_GEOMETRY('POINT(0 0)'), TO_GEOMETRY('LINESTRING(0 0, 0 2)')) +checked expr : st_disjoint(to_geometry("POINT(0 0)"), to_geometry("LINESTRING(0 0, 0 2)")) +optimized expr : false +output type : Boolean +output domain : {FALSE} +output : false + + +ast : ST_DISJOINT(TO_GEOMETRY('POLYGON((0 0, 0 2, 2 2, 2 0, 0 0))'), TO_GEOMETRY('POLYGON((1 1, 3 1, 3 3, 1 3, 1 1))')) +raw expr : ST_DISJOINT(TO_GEOMETRY('POLYGON((0 0, 0 2, 2 2, 2 0, 0 0))'), TO_GEOMETRY('POLYGON((1 1, 3 1, 3 3, 1 3, 1 1))')) +checked expr : st_disjoint(to_geometry("POLYGON((0 0, 0 2, 2 2, 2 0, 0 0))"), to_geometry("POLYGON((1 1, 3 1, 3 3, 1 3, 1 1))")) +optimized expr : false +output type : Boolean +output domain : {FALSE} +output : false + + +ast : ST_WITHIN(TO_GEOMETRY('POINT(1 2)'), TO_GEOMETRY('LINESTRING(0 0, 2 4)')) +raw expr : ST_WITHIN(TO_GEOMETRY('POINT(1 2)'), TO_GEOMETRY('LINESTRING(0 0, 2 4)')) +checked expr : st_within(to_geometry("POINT(1 2)"), to_geometry("LINESTRING(0 0, 2 4)")) +optimized expr : true +output type : Boolean +output domain : {TRUE} +output : true + + +ast : ST_WITHIN(TO_GEOMETRY('POINT(10 20)'), TO_GEOMETRY('POLYGON((0 0, 0 40, 40 40, 40 0, 0 0))')) +raw expr : ST_WITHIN(TO_GEOMETRY('POINT(10 20)'), TO_GEOMETRY('POLYGON((0 0, 0 40, 40 40, 40 0, 0 0))')) +checked expr : st_within(to_geometry("POINT(10 20)"), to_geometry("POLYGON((0 0, 0 40, 40 40, 40 0, 0 0))")) +optimized expr : true +output type : Boolean +output domain : {TRUE} +output : true + + +ast : ST_WITHIN(TO_GEOMETRY('POLYGON((0 0, 0 40, 40 40, 40 0, 0 0))'), TO_GEOMETRY('POINT(10 20)')) +raw expr : ST_WITHIN(TO_GEOMETRY('POLYGON((0 0, 0 40, 40 40, 40 0, 0 0))'), TO_GEOMETRY('POINT(10 20)')) +checked expr : st_within(to_geometry("POLYGON((0 0, 0 40, 40 40, 40 0, 0 0))"), to_geometry("POINT(10 20)")) +optimized expr : false +output type : Boolean +output domain : {FALSE} +output : false + + +ast : ST_EQUALS(TO_GEOMETRY('LINESTRING(0 0, 10 10)'), TO_GEOMETRY('LINESTRING(0 0, 5 5, 10 10)')) +raw expr : ST_EQUALS(TO_GEOMETRY('LINESTRING(0 0, 10 10)'), TO_GEOMETRY('LINESTRING(0 0, 5 5, 10 10)')) +checked expr : st_equals(to_geometry("LINESTRING(0 0, 10 10)"), to_geometry("LINESTRING(0 0, 5 5, 10 10)")) +optimized expr : true +output type : Boolean +output domain : {TRUE} +output : true + + +ast : ST_EQUALS(TO_GEOMETRY('LINESTRING(0 0, 5 5, 10 10)'), TO_GEOMETRY('LINESTRING(0 0, 10 10)')) +raw expr : ST_EQUALS(TO_GEOMETRY('LINESTRING(0 0, 5 5, 10 10)'), TO_GEOMETRY('LINESTRING(0 0, 10 10)')) +checked expr : st_equals(to_geometry("LINESTRING(0 0, 5 5, 10 10)"), to_geometry("LINESTRING(0 0, 10 10)")) +optimized expr : true +output type : Boolean +output domain : {TRUE} +output : true + + +ast : ST_EQUALS(TO_GEOMETRY('LINESTRING(10 10, 1 2)'), TO_GEOMETRY('LINESTRING(0 0, 5 5, 10 10)')) +raw expr : ST_EQUALS(TO_GEOMETRY('LINESTRING(10 10, 1 2)'), TO_GEOMETRY('LINESTRING(0 0, 5 5, 10 10)')) +checked expr : st_equals(to_geometry("LINESTRING(10 10, 1 2)"), to_geometry("LINESTRING(0 0, 5 5, 10 10)")) +optimized expr : false +output type : Boolean +output domain : {FALSE} +output : false + + +ast : ST_EQUALS(TO_GEOMETRY('POINT(10 10)'), TO_GEOMETRY('LINESTRING(10 10, 10 10)')) +raw expr : ST_EQUALS(TO_GEOMETRY('POINT(10 10)'), TO_GEOMETRY('LINESTRING(10 10, 10 10)')) +checked expr : st_equals(to_geometry("POINT(10 10)"), to_geometry("LINESTRING(10 10, 10 10)")) +optimized expr : true +output type : Boolean +output domain : {TRUE} +output : true + + diff --git a/tests/sqllogictests/suites/query/functions/02_0060_function_geometry.test b/tests/sqllogictests/suites/query/functions/02_0060_function_geometry.test index 136d80fc21a0..e66be9111ff7 100644 --- a/tests/sqllogictests/suites/query/functions/02_0060_function_geometry.test +++ b/tests/sqllogictests/suites/query/functions/02_0060_function_geometry.test @@ -636,7 +636,45 @@ SELECT ST_NPOINTS(g), ST_ASWKT(g) FROM geometry_shapes; 8 MULTIPOLYGON(((-10 0,0 10,10 0,-10 0)),((-10 40,10 40,0 20,-10 40))) 8 GEOMETRYCOLLECTION(POLYGON((-10 0,0 10,10 0,-10 0)),LINESTRING(40 60,50 50,60 40),POINT(99 11)) +query T +SELECT st_intersects(st_geomfromtext('SRID=4326;POLYGON((-87.906471 43.038902, -95.992775 36.153980, -75.704722 36.076944, -87.906471 43.038902))'), st_geomfromtext('SRID=4326;POLYGON((-84.191605 39.758949, -75.165222 39.952583, -78.878738 42.880230, -84.191605 39.758949))')); +---- +1 + +query T +SELECT st_intersects(st_geomfromtext('SRID=4326;POLYGON((-87.906471 43.038902, -95.992775 36.153980, -75.704722 36.076944, -87.906471 43.038902))'), st_geomfromtext('SRID=4326;POLYGON((-79.995888 40.440624,-74.666728 40.358244, -76.5 42.443333, -79.995888 40.440624))')); +---- +0 + +query T +SELECT st_disjoint(st_geomfromtext('SRID=4326;POLYGON((-87.906471 43.038902, -95.992775 36.153980, -75.704722 36.076944, -87.906471 43.038902), (-87.623177 41.881832, -90.199402 38.627003, -82.446732 38.413651, -87.623177 41.881832))'), st_geomfromtext('SRID=4326;POLYGON((-87.356934 41.595161, -84.512016 39.103119, -86.529167 39.162222, -87.356934 41.595161))')); +---- +1 + +query T +SELECT st_disjoint(st_geomfromtext('SRID=4326;POLYGON((-87.906471 43.038902, -95.992775 36.153980, -75.704722 36.076944, -87.906471 43.038902))'), st_geomfromtext('SRID=4326;POLYGON((-84.191605 39.758949, -75.165222 39.952583, -78.878738 42.880230, -84.191605 39.758949))')); +---- +0 +query T +SELECT ST_Within(st_geomfromtext('SRID=4326;POLYGON((-87.623177 41.881832, -90.199402 38.627003, -82.446732 38.413651, -87.623177 41.881832))'), st_geomfromtext('SRID=4326;POLYGON((-87.906471 43.038902, -95.992775 36.153980, -75.704722 36.076944, -87.906471 43.038902))')); +---- +1 + +query T +SELECT ST_Within(st_geomfromtext('SRID=4326;POLYGON((-87.906471 43.038902, -95.992775 36.153980, -75.704722 36.076944, -87.906471 43.038902), (-87.623177 41.881832, -90.199402 38.627003, -82.446732 38.413651, -87.623177 41.881832))'), st_geomfromtext('SRID=4326;POLYGON((-87.356934 41.595161, -84.512016 39.103119, -86.529167 39.162222, -87.356934 41.595161))')); +---- +0 + +query T +SELECT st_equals(st_geomfromtext('SRID=4326;POLYGON((-87.906471 43.038902, -95.992775 36.153980, -75.704722 36.076944, -87.906471 43.038902))'), st_geomfromtext('SRID=4326;POLYGON((-95.992775 36.153980, -87.906471 43.038902, -75.704722 36.076944, -95.992775 36.153980))')); +---- +1 + +query T +SELECT st_equals(st_geomfromtext('SRID=4326;POLYGON((-87.906471 43.038902, -95.992775 36.153980, -75.704722 36.076944, -87.906471 43.038902))'), st_geomfromtext('SRID=4326;POLYGON((-84.191605 39.758949, -75.165222 39.952583, -78.878738 42.880230, -84.191605 39.758949))')); +---- +0 statement ok SET enable_geo_create_table=0 From 06c4ab6c1bd070beeb5bd086197241fd5a2d1881 Mon Sep 17 00:00:00 2001 From: sundyli <543950155@qq.com> Date: Thu, 28 Nov 2024 19:03:43 +0800 Subject: [PATCH 4/7] fix(query): make memory engine as non-local table (#16955) * fix(query): disallow insert into local table in cluster mode * fix(query): make null table / memory table distributed * update * update * update --- src/query/catalog/src/table.rs | 4 ++ .../src/interpreters/interpreter_insert.rs | 13 ++--- src/query/storages/fuse/src/fuse_table.rs | 4 ++ src/query/storages/memory/src/memory_part.rs | 18 +++---- src/query/storages/memory/src/memory_table.rs | 53 +++++++------------ src/query/storages/null/src/null_table.rs | 5 ++ .../cluster/distributed_copy_into_table.test | 19 +++++++ 7 files changed, 65 insertions(+), 51 deletions(-) diff --git a/src/query/catalog/src/table.rs b/src/query/catalog/src/table.rs index 916d170f1563..58b7f281f1c3 100644 --- a/src/query/catalog/src/table.rs +++ b/src/query/catalog/src/table.rs @@ -108,6 +108,10 @@ pub trait Table: Sync + Send { false } + fn support_distributed_insert(&self) -> bool { + false + } + /// whether table has the exact number of total rows fn has_exact_total_row_count(&self) -> bool { false diff --git a/src/query/service/src/interpreters/interpreter_insert.rs b/src/query/service/src/interpreters/interpreter_insert.rs index c98267460bab..f9d951033bd9 100644 --- a/src/query/service/src/interpreters/interpreter_insert.rs +++ b/src/query/service/src/interpreters/interpreter_insert.rs @@ -139,7 +139,7 @@ impl Interpreter for InsertInterpreter { } InsertInputSource::SelectPlan(plan) => { let table1 = table.clone(); - let (mut select_plan, select_column_bindings, metadata) = match plan.as_ref() { + let (select_plan, select_column_bindings, metadata) = match plan.as_ref() { Plan::Query { s_expr, metadata, @@ -166,10 +166,11 @@ impl Interpreter for InsertInterpreter { dml_build_update_stream_req(self.ctx.clone(), metadata).await?; // here we remove the last exchange merge plan to trigger distribute insert - let insert_select_plan = match select_plan { - PhysicalPlan::Exchange(ref mut exchange) => { - // insert can be dispatched to different nodes + let insert_select_plan = match (select_plan, table.support_distributed_insert()) { + (PhysicalPlan::Exchange(ref mut exchange), true) => { + // insert can be dispatched to different nodes if table support_distributed_insert let input = exchange.input.clone(); + exchange.input = Box::new(PhysicalPlan::DistributedInsertSelect(Box::new( DistributedInsertSelect { // TODO(leiysky): we reuse the id of exchange here, @@ -183,9 +184,9 @@ impl Interpreter for InsertInterpreter { cast_needed: self.check_schema_cast(plan)?, }, ))); - select_plan + PhysicalPlan::Exchange(exchange.clone()) } - other_plan => { + (other_plan, _) => { // insert should wait until all nodes finished PhysicalPlan::DistributedInsertSelect(Box::new(DistributedInsertSelect { // TODO: we reuse the id of other plan here, diff --git a/src/query/storages/fuse/src/fuse_table.rs b/src/query/storages/fuse/src/fuse_table.rs index b10658fba01e..f90b616b09c8 100644 --- a/src/query/storages/fuse/src/fuse_table.rs +++ b/src/query/storages/fuse/src/fuse_table.rs @@ -568,6 +568,10 @@ impl Table for FuseTable { true } + fn support_distributed_insert(&self) -> bool { + true + } + fn has_exact_total_row_count(&self) -> bool { true } diff --git a/src/query/storages/memory/src/memory_part.rs b/src/query/storages/memory/src/memory_part.rs index bd52eb3469e6..531ebd044582 100644 --- a/src/query/storages/memory/src/memory_part.rs +++ b/src/query/storages/memory/src/memory_part.rs @@ -16,15 +16,13 @@ use std::any::Any; use std::sync::Arc; use databend_common_catalog::plan::PartInfo; +use databend_common_catalog::plan::PartInfoPtr; +/// Memory table lazy partition information. #[derive(serde::Serialize, serde::Deserialize, PartialEq, Eq)] -pub struct MemoryPartInfo { - pub total: usize, - pub part_start: usize, - pub part_end: usize, -} +pub struct MemoryPartInfo {} -#[typetag::serde(name = "memory")] +#[typetag::serde(name = "memory_part")] impl PartInfo for MemoryPartInfo { fn as_any(&self) -> &dyn Any { self @@ -42,11 +40,7 @@ impl PartInfo for MemoryPartInfo { } impl MemoryPartInfo { - pub fn create(start: usize, end: usize, total: usize) -> Arc> { - Arc::new(Box::new(MemoryPartInfo { - total, - part_start: start, - part_end: end, - })) + pub fn create() -> PartInfoPtr { + Arc::new(Box::new(MemoryPartInfo {})) } } diff --git a/src/query/storages/memory/src/memory_table.rs b/src/query/storages/memory/src/memory_table.rs index 816fd71d6b83..b1830ac3c4f2 100644 --- a/src/query/storages/memory/src/memory_table.rs +++ b/src/query/storages/memory/src/memory_table.rs @@ -123,31 +123,6 @@ impl MemoryTable { Arc::new(Mutex::new(read_data_blocks)) } - - pub fn generate_memory_parts(start: usize, workers: usize, total: usize) -> Partitions { - let part_size = total / workers; - let part_remain = total % workers; - - let mut partitions = Vec::with_capacity(workers); - if part_size == 0 { - partitions.push(MemoryPartInfo::create(start, total, total)); - } else { - for part in 0..workers { - let mut part_begin = part * part_size; - if part == 0 && start > 0 { - part_begin = start; - } - let mut part_end = (part + 1) * part_size; - if part == (workers - 1) && part_remain > 0 { - part_end += part_remain; - } - - partitions.push(MemoryPartInfo::create(part_begin, part_end, total)); - } - } - - Partitions::create(PartitionsShuffleKind::Seq, partitions) - } } #[async_trait::async_trait] @@ -160,6 +135,12 @@ impl Table for MemoryTable { &self.table_info } + /// MemoryTable could be distributed table, yet we only insert data in one node per query + /// Because commit_insert did not support distributed transaction + fn is_local(&self) -> bool { + false + } + fn support_column_projection(&self) -> bool { true } @@ -176,8 +157,7 @@ impl Table for MemoryTable { _dry_run: bool, ) -> Result<(PartStatistics, Partitions)> { let blocks = self.blocks.read(); - - let statistics = match push_downs { + let mut statistics = match push_downs { Some(push_downs) => { let projection_filter: Box bool> = match push_downs.projection { Some(prj) => { @@ -214,12 +194,19 @@ impl Table for MemoryTable { } }; - let parts = Self::generate_memory_parts( - 0, - ctx.get_settings().get_max_threads()? as usize, - blocks.len(), - ); - Ok((statistics, parts)) + let cluster = ctx.get_cluster(); + if !cluster.is_empty() { + statistics.read_bytes = statistics.read_bytes.max(cluster.nodes.len()); + statistics.read_rows = statistics.read_rows.max(cluster.nodes.len()); + statistics.partitions_total = statistics.partitions_total.max(cluster.nodes.len()); + statistics.partitions_scanned = statistics.partitions_scanned.max(cluster.nodes.len()); + } + + let parts = vec![MemoryPartInfo::create()]; + return Ok(( + statistics, + Partitions::create(PartitionsShuffleKind::Broadcast, parts), + )); } fn read_data( diff --git a/src/query/storages/null/src/null_table.rs b/src/query/storages/null/src/null_table.rs index cb9156286edd..5bb625429694 100644 --- a/src/query/storages/null/src/null_table.rs +++ b/src/query/storages/null/src/null_table.rs @@ -61,6 +61,11 @@ impl Table for NullTable { &self.table_info } + /// Null do not keep data, it's safe to make it non-local. + fn is_local(&self) -> bool { + false + } + #[async_backtrace::framed] async fn read_partitions( &self, diff --git a/tests/sqllogictests/suites/mode/cluster/distributed_copy_into_table.test b/tests/sqllogictests/suites/mode/cluster/distributed_copy_into_table.test index 80172fe09feb..a5a51cdd9466 100644 --- a/tests/sqllogictests/suites/mode/cluster/distributed_copy_into_table.test +++ b/tests/sqllogictests/suites/mode/cluster/distributed_copy_into_table.test @@ -183,5 +183,24 @@ select count(*) from test_order; ---- 4000000 +statement ok +create or replace table test_memory engine = Memory as select number, (number + 1) d from numbers(100000); + +query I +select count() from test_memory +---- +100000 + +statement ok +insert into test_memory select number, sum(number) from numbers(100000) group by number; + +query I +select count() from test_memory +---- +200000 + +statement ok +drop table test_memory; + statement ok set enable_distributed_copy_into = 0; From 7cff1351d80191617fe7e3352ede9e641489fd6c Mon Sep 17 00:00:00 2001 From: baishen Date: Thu, 28 Nov 2024 19:17:11 +0800 Subject: [PATCH 5/7] feat(query): virtual column allow cast to other type (#16903) * feat(query): virtual column allow cast to other type * add tests * fix typos * fix --- Cargo.lock | 1 + src/meta/api/src/schema_api_test_suite.rs | 187 ++++++++++- src/meta/app/src/schema/virtual_column.rs | 7 +- src/meta/proto-conv/src/util.rs | 1 + .../virtual_column_from_to_protobuf_impl.rs | 38 ++- src/meta/proto-conv/tests/it/main.rs | 2 + .../tests/it/v041_virtual_column.rs | 12 +- ...v111_add_glue_as_iceberg_catalog_option.rs | 37 ++- .../tests/it/v112_virtual_column.rs | 76 +++++ src/meta/protos/proto/virtual_column.proto | 5 + src/query/catalog/src/plan/pushdown.rs | 2 + .../fuse/operations/virtual_columns.rs | 33 +- .../virtual_column/virtual_column_handler.rs | 3 +- .../fuse/operations/virtual_columns.rs | 20 +- .../ee_features/virtual_column/Cargo.toml | 1 + .../virtual_column/src/virtual_column.rs | 5 +- .../physical_plans/physical_table_scan.rs | 10 + .../sql/src/planner/binder/bind_context.rs | 25 ++ .../planner/binder/bind_query/bind_select.rs | 19 +- .../src/planner/binder/ddl/virtual_column.rs | 81 ++++- src/query/sql/src/planner/binder/table.rs | 1 + .../src/planner/plans/ddl/virtual_column.rs | 7 +- .../sql/src/planner/semantic/type_check.rs | 192 +++++++++++ .../semantic/virtual_column_rewriter.rs | 297 +----------------- .../block/block_reader_native_deserialize.rs | 3 +- .../virtual_column_reader_parquet.rs | 14 +- .../read/native_data_source_deserializer.rs | 24 +- .../system/src/virtual_columns_table.rs | 15 +- src/tests/sqlsmith/src/sql_gen/ddl.rs | 8 +- .../01_ee_system/01_0002_virtual_column.test | 194 ++++++------ 30 files changed, 866 insertions(+), 454 deletions(-) create mode 100644 src/meta/proto-conv/tests/it/v112_virtual_column.rs diff --git a/Cargo.lock b/Cargo.lock index e3301c4633db..33a6fb65cf77 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4832,6 +4832,7 @@ dependencies = [ "databend-common-base", "databend-common-catalog", "databend-common-exception", + "databend-common-expression", "databend-common-meta-app", "databend-common-pipeline-core", "databend-common-storages-fuse", diff --git a/src/meta/api/src/schema_api_test_suite.rs b/src/meta/api/src/schema_api_test_suite.rs index eb4cb386e32b..e5ffb7539ec7 100644 --- a/src/meta/api/src/schema_api_test_suite.rs +++ b/src/meta/api/src/schema_api_test_suite.rs @@ -6416,7 +6416,26 @@ impl SchemaApiTestSuite { let req = CreateVirtualColumnReq { create_option: CreateOption::Create, name_ident: name_ident.clone(), - virtual_columns: vec!["variant:k1".to_string(), "variant[1]".to_string()], + virtual_columns: vec![ + ( + "variant:k1".to_string(), + TableDataType::Nullable(Box::new(TableDataType::Variant)), + ), + ( + "variant[1]".to_string(), + TableDataType::Nullable(Box::new(TableDataType::Variant)), + ), + ( + "variant:k1:k2".to_string(), + TableDataType::Nullable(Box::new(TableDataType::String)), + ), + ( + "variant:k1:k3".to_string(), + TableDataType::Nullable(Box::new(TableDataType::Number( + NumberDataType::UInt64, + ))), + ), + ], }; mt.create_virtual_column(req.clone()).await?; @@ -6425,7 +6444,26 @@ impl SchemaApiTestSuite { let req = CreateVirtualColumnReq { create_option: CreateOption::Create, name_ident: name_ident.clone(), - virtual_columns: vec!["variant:k1".to_string(), "variant[1]".to_string()], + virtual_columns: vec![ + ( + "variant:k1".to_string(), + TableDataType::Nullable(Box::new(TableDataType::Variant)), + ), + ( + "variant[1]".to_string(), + TableDataType::Nullable(Box::new(TableDataType::Variant)), + ), + ( + "variant:k1:k2".to_string(), + TableDataType::Nullable(Box::new(TableDataType::String)), + ), + ( + "variant:k1:k3".to_string(), + TableDataType::Nullable(Box::new(TableDataType::Number( + NumberDataType::UInt64, + ))), + ), + ], }; let res = mt.create_virtual_column(req).await; @@ -6439,8 +6477,24 @@ impl SchemaApiTestSuite { let res = mt.list_virtual_columns(req).await?; assert_eq!(1, res.len()); assert_eq!(res[0].virtual_columns, vec![ - "variant:k1".to_string(), - "variant[1]".to_string(), + ( + "variant:k1".to_string(), + TableDataType::Nullable(Box::new(TableDataType::Variant)) + ), + ( + "variant[1]".to_string(), + TableDataType::Nullable(Box::new(TableDataType::Variant)) + ), + ( + "variant:k1:k2".to_string(), + TableDataType::Nullable(Box::new(TableDataType::String)), + ), + ( + "variant:k1:k3".to_string(), + TableDataType::Nullable(Box::new(TableDataType::Number( + NumberDataType::UInt64 + ))), + ), ]); let req = ListVirtualColumnsReq::new(&tenant, Some(u64::MAX)); @@ -6454,7 +6508,26 @@ impl SchemaApiTestSuite { let req = UpdateVirtualColumnReq { if_exists: false, name_ident: name_ident.clone(), - virtual_columns: vec!["variant:k2".to_string(), "variant[2]".to_string()], + virtual_columns: vec![ + ( + "variant:k2".to_string(), + TableDataType::Nullable(Box::new(TableDataType::Variant)), + ), + ( + "variant[2]".to_string(), + TableDataType::Nullable(Box::new(TableDataType::Variant)), + ), + ( + "variant:k2:k3".to_string(), + TableDataType::Nullable(Box::new(TableDataType::String)), + ), + ( + "variant:k2:k4".to_string(), + TableDataType::Nullable(Box::new(TableDataType::Number( + NumberDataType::UInt64, + ))), + ), + ], }; mt.update_virtual_column(req).await?; @@ -6467,8 +6540,24 @@ impl SchemaApiTestSuite { let res = mt.list_virtual_columns(req).await?; assert_eq!(1, res.len()); assert_eq!(res[0].virtual_columns, vec![ - "variant:k2".to_string(), - "variant[2]".to_string(), + ( + "variant:k2".to_string(), + TableDataType::Nullable(Box::new(TableDataType::Variant)) + ), + ( + "variant[2]".to_string(), + TableDataType::Nullable(Box::new(TableDataType::Variant)) + ), + ( + "variant:k2:k3".to_string(), + TableDataType::Nullable(Box::new(TableDataType::String)), + ), + ( + "variant:k2:k4".to_string(), + TableDataType::Nullable(Box::new(TableDataType::Number( + NumberDataType::UInt64 + ))), + ), ]); } @@ -6495,7 +6584,26 @@ impl SchemaApiTestSuite { let req = UpdateVirtualColumnReq { if_exists: false, name_ident: name_ident.clone(), - virtual_columns: vec!["variant:k3".to_string(), "variant[3]".to_string()], + virtual_columns: vec![ + ( + "variant:k3".to_string(), + TableDataType::Nullable(Box::new(TableDataType::Variant)), + ), + ( + "variant[3]".to_string(), + TableDataType::Nullable(Box::new(TableDataType::Variant)), + ), + ( + "variant:k3:k4".to_string(), + TableDataType::Nullable(Box::new(TableDataType::String)), + ), + ( + "variant:k3:k5".to_string(), + TableDataType::Nullable(Box::new(TableDataType::Number( + NumberDataType::UInt64, + ))), + ), + ], }; let res = mt.update_virtual_column(req).await; @@ -6507,7 +6615,26 @@ impl SchemaApiTestSuite { let req = CreateVirtualColumnReq { create_option: CreateOption::Create, name_ident: name_ident.clone(), - virtual_columns: vec!["variant:k1".to_string(), "variant[1]".to_string()], + virtual_columns: vec![ + ( + "variant:k1".to_string(), + TableDataType::Nullable(Box::new(TableDataType::Variant)), + ), + ( + "variant[1]".to_string(), + TableDataType::Nullable(Box::new(TableDataType::Variant)), + ), + ( + "variant:k1:k4".to_string(), + TableDataType::Nullable(Box::new(TableDataType::String)), + ), + ( + "variant:k1:k5".to_string(), + TableDataType::Nullable(Box::new(TableDataType::Number( + NumberDataType::UInt64, + ))), + ), + ], }; mt.create_virtual_column(req.clone()).await?; @@ -6517,14 +6644,39 @@ impl SchemaApiTestSuite { let res = mt.list_virtual_columns(req).await?; assert_eq!(1, res.len()); assert_eq!(res[0].virtual_columns, vec![ - "variant:k1".to_string(), - "variant[1]".to_string(), + ( + "variant:k1".to_string(), + TableDataType::Nullable(Box::new(TableDataType::Variant)) + ), + ( + "variant[1]".to_string(), + TableDataType::Nullable(Box::new(TableDataType::Variant)) + ), + ( + "variant:k1:k4".to_string(), + TableDataType::Nullable(Box::new(TableDataType::String)), + ), + ( + "variant:k1:k5".to_string(), + TableDataType::Nullable(Box::new(TableDataType::Number( + NumberDataType::UInt64 + ))), + ), ]); let req = CreateVirtualColumnReq { create_option: CreateOption::CreateOrReplace, name_ident: name_ident.clone(), - virtual_columns: vec!["variant:k2".to_string()], + virtual_columns: vec![ + ( + "variant:k2".to_string(), + TableDataType::Nullable(Box::new(TableDataType::Variant)), + ), + ( + "variant:k3".to_string(), + TableDataType::Nullable(Box::new(TableDataType::String)), + ), + ], }; mt.create_virtual_column(req.clone()).await?; @@ -6533,7 +6685,16 @@ impl SchemaApiTestSuite { let res = mt.list_virtual_columns(req).await?; assert_eq!(1, res.len()); - assert_eq!(res[0].virtual_columns, vec!["variant:k2".to_string(),]); + assert_eq!(res[0].virtual_columns, vec![ + ( + "variant:k2".to_string(), + TableDataType::Nullable(Box::new(TableDataType::Variant)) + ), + ( + "variant:k3".to_string(), + TableDataType::Nullable(Box::new(TableDataType::String)), + ) + ]); } Ok(()) diff --git a/src/meta/app/src/schema/virtual_column.rs b/src/meta/app/src/schema/virtual_column.rs index 0470ef4d1235..70c566530163 100644 --- a/src/meta/app/src/schema/virtual_column.rs +++ b/src/meta/app/src/schema/virtual_column.rs @@ -18,6 +18,7 @@ use std::fmt::Formatter; use chrono::DateTime; use chrono::Utc; +use databend_common_expression::TableDataType; use databend_common_meta_types::MetaId; use super::CreateOption; @@ -29,7 +30,7 @@ use crate::tenant::ToTenant; pub struct VirtualColumnMeta { pub table_id: MetaId, - pub virtual_columns: Vec, + pub virtual_columns: Vec<(String, TableDataType)>, pub created_on: DateTime, pub updated_on: Option>, } @@ -38,7 +39,7 @@ pub struct VirtualColumnMeta { pub struct CreateVirtualColumnReq { pub create_option: CreateOption, pub name_ident: VirtualColumnIdent, - pub virtual_columns: Vec, + pub virtual_columns: Vec<(String, TableDataType)>, } impl Display for CreateVirtualColumnReq { @@ -56,7 +57,7 @@ impl Display for CreateVirtualColumnReq { pub struct UpdateVirtualColumnReq { pub if_exists: bool, pub name_ident: VirtualColumnIdent, - pub virtual_columns: Vec, + pub virtual_columns: Vec<(String, TableDataType)>, } impl Display for UpdateVirtualColumnReq { diff --git a/src/meta/proto-conv/src/util.rs b/src/meta/proto-conv/src/util.rs index 1bfefc708568..6f82943521ac 100644 --- a/src/meta/proto-conv/src/util.rs +++ b/src/meta/proto-conv/src/util.rs @@ -141,6 +141,7 @@ const META_CHANGE_LOG: &[(u64, &str)] = &[ (109, "2024-08-29: Refactor: ProcedureMeta add arg_names"), (110, "2024-09-18: Add: database.proto: DatabaseMeta.gc_in_progress"), (111, "2024-11-13: Add: Enable AWS Glue as an Apache Iceberg type when creating a catalog."), + (112, "2024-11-28: Add: virtual_column add data_types field"), // Dear developer: // If you're gonna add a new metadata version, you'll have to add a test for it. // You could just copy an existing test file(e.g., `../tests/it/v024_table_meta.rs`) diff --git a/src/meta/proto-conv/src/virtual_column_from_to_protobuf_impl.rs b/src/meta/proto-conv/src/virtual_column_from_to_protobuf_impl.rs index 5001bacdeb89..afb0b6a7f2ed 100644 --- a/src/meta/proto-conv/src/virtual_column_from_to_protobuf_impl.rs +++ b/src/meta/proto-conv/src/virtual_column_from_to_protobuf_impl.rs @@ -17,6 +17,7 @@ use chrono::DateTime; use chrono::Utc; +use databend_common_expression::TableDataType; use databend_common_meta_app::schema as mt; use databend_common_protos::pb; @@ -36,10 +37,36 @@ impl FromToProto for mt::VirtualColumnMeta { fn from_pb(p: Self::PB) -> Result where Self: Sized { reader_check_msg(p.ver, p.min_reader_ver)?; + let virtual_columns = if p.data_types.is_empty() { + p.virtual_columns + .iter() + .map(|v| { + ( + v.clone(), + TableDataType::Nullable(Box::new(TableDataType::Variant)), + ) + }) + .collect() + } else { + if p.virtual_columns.len() != p.data_types.len() { + return Err(Incompatible { + reason: format!( + "Incompatible virtual columns length is {}, but data types length is {}", + p.virtual_columns.len(), + p.data_types.len() + ), + }); + } + let mut virtual_columns = Vec::new(); + for (v, ty) in p.virtual_columns.iter().zip(p.data_types.iter()) { + virtual_columns.push((v.clone(), TableDataType::from_pb(ty.clone())?)); + } + virtual_columns + }; let v = Self { table_id: p.table_id, - virtual_columns: p.virtual_columns, + virtual_columns, created_on: DateTime::::from_pb(p.created_on)?, updated_on: match p.updated_on { Some(updated_on) => Some(DateTime::::from_pb(updated_on)?), @@ -50,16 +77,23 @@ impl FromToProto for mt::VirtualColumnMeta { } fn to_pb(&self) -> Result { + let mut data_types = Vec::new(); + let mut virtual_columns = Vec::new(); + for (v, ty) in self.virtual_columns.iter() { + data_types.push(ty.to_pb()?); + virtual_columns.push(v.clone()); + } let p = pb::VirtualColumnMeta { ver: VER, min_reader_ver: MIN_READER_VER, table_id: self.table_id, - virtual_columns: self.virtual_columns.clone(), + virtual_columns, created_on: self.created_on.to_pb()?, updated_on: match self.updated_on { Some(updated_on) => Some(updated_on.to_pb()?), None => None, }, + data_types, }; Ok(p) } diff --git a/src/meta/proto-conv/tests/it/main.rs b/src/meta/proto-conv/tests/it/main.rs index 2ab430fb1ad1..5c03fc8db8e8 100644 --- a/src/meta/proto-conv/tests/it/main.rs +++ b/src/meta/proto-conv/tests/it/main.rs @@ -108,3 +108,5 @@ mod v107_geography_datatype; mod v108_procedure; mod v109_procedure_with_args; mod v110_database_meta_gc_in_progress; +mod v111_add_glue_as_iceberg_catalog_option; +mod v112_virtual_column; diff --git a/src/meta/proto-conv/tests/it/v041_virtual_column.rs b/src/meta/proto-conv/tests/it/v041_virtual_column.rs index 922f879cdec5..e3719ce580c6 100644 --- a/src/meta/proto-conv/tests/it/v041_virtual_column.rs +++ b/src/meta/proto-conv/tests/it/v041_virtual_column.rs @@ -14,6 +14,7 @@ use chrono::TimeZone; use chrono::Utc; +use databend_common_expression::TableDataType; use databend_common_meta_app::schema::VirtualColumnMeta; use fastrace::func_name; @@ -40,7 +41,16 @@ fn test_decode_v41_virtual_column() -> anyhow::Result<()> { let want = || { let table_id = 7; - let virtual_columns = vec!["v:k1:k2".to_string(), "v[1][2]".to_string()]; + let virtual_columns = vec![ + ( + "v:k1:k2".to_string(), + TableDataType::Nullable(Box::new(TableDataType::Variant)), + ), + ( + "v[1][2]".to_string(), + TableDataType::Nullable(Box::new(TableDataType::Variant)), + ), + ]; let created_on = Utc.with_ymd_and_hms(2023, 3, 9, 10, 0, 0).unwrap(); let updated_on = Some(Utc.with_ymd_and_hms(2023, 5, 29, 10, 0, 0).unwrap()); diff --git a/src/meta/proto-conv/tests/it/v111_add_glue_as_iceberg_catalog_option.rs b/src/meta/proto-conv/tests/it/v111_add_glue_as_iceberg_catalog_option.rs index 478175a338b5..dc146b15753e 100644 --- a/src/meta/proto-conv/tests/it/v111_add_glue_as_iceberg_catalog_option.rs +++ b/src/meta/proto-conv/tests/it/v111_add_glue_as_iceberg_catalog_option.rs @@ -18,7 +18,7 @@ use databend_common_meta_app::schema::CatalogOption; use databend_common_meta_app::schema::IcebergCatalogOption; use databend_common_meta_app::schema::IcebergGlueCatalogOption; use fastrace::func_name; -use std::collections::HashMap; +use maplit::hashmap; use crate::common; @@ -35,24 +35,27 @@ use crate::common; #[test] fn test_v111_add_glue_as_iceberg_catalog_option() -> anyhow::Result<()> { let catalog_meta_v111 = vec![ - 18, 55, 26, 53, 18, 45, 10, 21, 104, 116, 116, 112, 58, 47, 47, 49, 50, 55, 46, 48, 46, 48, - 46, 49, 58, 57, 57, 48, 48, 18, 14, 115, 51, 58, 47, 47, 109, 121, 95, 98, 117, 99, 107, - 101, 116, 160, 6, 98, 168, 6, 24, 160, 6, 98, 168, 6, 24, 162, 1, 23, 50, 48, 49, 52, 45, - 49, 49, 45, 50, 56, 32, 49, 50, 58, 48, 48, 58, 48, 57, 32, 85, 84, 67, 160, 6, 98, 168, 6, - 24, + 18, 169, 1, 26, 166, 1, 34, 157, 1, 10, 14, 115, 51, 58, 47, 47, 109, 121, 95, 98, 117, 99, + 107, 101, 116, 18, 37, 10, 10, 65, 87, 83, 95, 75, 69, 89, 95, 73, 68, 18, 23, 115, 117, + 112, 101, 114, 32, 115, 101, 99, 117, 114, 101, 32, 97, 99, 99, 101, 115, 115, 32, 107, + 101, 121, 18, 45, 10, 14, 65, 87, 83, 95, 83, 69, 67, 82, 69, 84, 95, 75, 69, 89, 18, 27, + 101, 118, 101, 110, 32, 109, 111, 114, 101, 32, 115, 101, 99, 117, 114, 101, 32, 115, 101, + 99, 114, 101, 116, 32, 107, 101, 121, 18, 47, 10, 6, 82, 69, 71, 73, 79, 78, 18, 37, 117, + 115, 45, 101, 97, 115, 116, 45, 49, 32, 97, 107, 97, 32, 97, 110, 116, 105, 45, 109, 117, + 108, 116, 105, 45, 97, 118, 97, 105, 108, 97, 98, 105, 108, 105, 116, 121, 160, 6, 111, + 168, 6, 24, 160, 6, 111, 168, 6, 24, 162, 1, 23, 50, 48, 49, 52, 45, 49, 49, 45, 50, 56, + 32, 49, 50, 58, 48, 48, 58, 48, 57, 32, 85, 84, 67, 160, 6, 111, 168, 6, 24, ]; - let mut props = HashMap::new(); - props.insert("AWS_KEY_ID".to_string(), "super secure access key".to_string()); - props.insert("AWS_SECRET_KEY".to_string(), "even more secure secret key".to_string()); - props.insert("REGION".to_string(), "us-east-1 aka anti-multi-availability".to_string()); - let want = || databend_common_meta_app::schema::CatalogMeta { - catalog_option: CatalogOption::Iceberg(IcebergGlueCatalogOption::Rest( + catalog_option: CatalogOption::Iceberg(IcebergCatalogOption::Glue( IcebergGlueCatalogOption { - address: "http://127.0.0.1:9900".to_string(), warehouse: "s3://my_bucket".to_string(), - props, + props: hashmap! { + s("AWS_KEY_ID") => s("super secure access key"), + s("AWS_SECRET_KEY") => s("even more secure secret key"), + s("REGION") => s("us-east-1 aka anti-multi-availability"), + }, }, )), created_on: Utc.with_ymd_and_hms(2014, 11, 28, 12, 0, 9).unwrap(), @@ -62,4 +65,8 @@ fn test_v111_add_glue_as_iceberg_catalog_option() -> anyhow::Result<()> { common::test_load_old(func_name!(), catalog_meta_v111.as_slice(), 111, want())?; Ok(()) -} \ No newline at end of file +} + +fn s(ss: impl ToString) -> String { + ss.to_string() +} diff --git a/src/meta/proto-conv/tests/it/v112_virtual_column.rs b/src/meta/proto-conv/tests/it/v112_virtual_column.rs new file mode 100644 index 000000000000..dce9eadd5862 --- /dev/null +++ b/src/meta/proto-conv/tests/it/v112_virtual_column.rs @@ -0,0 +1,76 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use chrono::TimeZone; +use chrono::Utc; +use databend_common_expression::TableDataType; +use databend_common_meta_app::schema::VirtualColumnMeta; +use fastrace::func_name; + +use crate::common; + +// These bytes are built when a new version in introduced, +// and are kept for backward compatibility test. +// +// ************************************************************* +// * These messages should never be updated, * +// * only be added when a new version is added, * +// * or be removed when an old version is no longer supported. * +// ************************************************************* +// +// The message bytes are built from the output of `proto_conv::test_build_pb_buf()` +#[test] +fn test_decode_v112_virtual_column() -> anyhow::Result<()> { + let schema_v112 = vec![ + 8, 7, 18, 7, 118, 58, 107, 49, 58, 107, 50, 18, 7, 118, 91, 49, 93, 91, 50, 93, 18, 7, 118, + 58, 107, 51, 58, 107, 52, 26, 23, 50, 48, 50, 51, 45, 48, 51, 45, 48, 57, 32, 49, 48, 58, + 48, 48, 58, 48, 48, 32, 85, 84, 67, 34, 23, 50, 48, 50, 51, 45, 48, 53, 45, 50, 57, 32, 49, + 48, 58, 48, 48, 58, 48, 48, 32, 85, 84, 67, 42, 18, 178, 2, 9, 210, 2, 0, 160, 6, 111, 168, + 6, 24, 160, 6, 111, 168, 6, 24, 42, 18, 178, 2, 9, 210, 2, 0, 160, 6, 111, 168, 6, 24, 160, + 6, 111, 168, 6, 24, 42, 18, 178, 2, 9, 146, 2, 0, 160, 6, 111, 168, 6, 24, 160, 6, 111, + 168, 6, 24, 160, 6, 112, 168, 6, 24, + ]; + + let want = || { + let table_id = 7; + let virtual_columns = vec![ + ( + "v:k1:k2".to_string(), + TableDataType::Nullable(Box::new(TableDataType::Variant)), + ), + ( + "v[1][2]".to_string(), + TableDataType::Nullable(Box::new(TableDataType::Variant)), + ), + ( + "v:k3:k4".to_string(), + TableDataType::Nullable(Box::new(TableDataType::String)), + ), + ]; + let created_on = Utc.with_ymd_and_hms(2023, 3, 9, 10, 0, 0).unwrap(); + let updated_on = Some(Utc.with_ymd_and_hms(2023, 5, 29, 10, 0, 0).unwrap()); + + VirtualColumnMeta { + table_id, + virtual_columns, + created_on, + updated_on, + } + }; + + common::test_pb_from_to(func_name!(), want())?; + common::test_load_old(func_name!(), schema_v112.as_slice(), 112, want())?; + + Ok(()) +} diff --git a/src/meta/protos/proto/virtual_column.proto b/src/meta/protos/proto/virtual_column.proto index 32407d53eab9..92692f9e2c3b 100644 --- a/src/meta/protos/proto/virtual_column.proto +++ b/src/meta/protos/proto/virtual_column.proto @@ -20,6 +20,8 @@ syntax = "proto3"; package databend_proto; +import "datatype.proto"; + // VirtualColumnMeta is a container of virtual columns information. message VirtualColumnMeta { uint64 ver = 100; @@ -36,4 +38,7 @@ message VirtualColumnMeta { // The time virtual column updated. optional string updated_on = 4; + + // virtual column data type + repeated DataType data_types = 5; } diff --git a/src/query/catalog/src/plan/pushdown.rs b/src/query/catalog/src/plan/pushdown.rs index b91c0db0309e..c0622048a367 100644 --- a/src/query/catalog/src/plan/pushdown.rs +++ b/src/query/catalog/src/plan/pushdown.rs @@ -60,6 +60,8 @@ pub struct VirtualColumnField { pub name: String, /// Paths to generate virtual column from source column. pub key_paths: Scalar, + /// optional cast function name, used to cast value to other type. + pub cast_func_name: Option, /// Virtual column data type. pub data_type: Box, /// Is the virtual column is created, diff --git a/src/query/ee/src/storages/fuse/operations/virtual_columns.rs b/src/query/ee/src/storages/fuse/operations/virtual_columns.rs index 867fb87ad43a..705c87663251 100644 --- a/src/query/ee/src/storages/fuse/operations/virtual_columns.rs +++ b/src/query/ee/src/storages/fuse/operations/virtual_columns.rs @@ -32,6 +32,7 @@ use databend_common_expression::Evaluator; use databend_common_expression::Expr; use databend_common_expression::TableDataType; use databend_common_expression::TableField; +use databend_common_expression::TableSchema; use databend_common_expression::TableSchemaRef; use databend_common_expression::TableSchemaRefExt; use databend_common_functions::BUILTIN_FUNCTIONS; @@ -82,7 +83,7 @@ use opendal::Operator; pub async fn do_refresh_virtual_column( ctx: Arc, fuse_table: &FuseTable, - virtual_columns: Vec, + virtual_columns: Vec<(String, TableDataType)>, segment_locs: Option>, pipeline: &mut Pipeline, ) -> Result<()> { @@ -106,7 +107,7 @@ pub async fn do_refresh_virtual_column( if f.data_type().remove_nullable() != TableDataType::Variant { continue; } - let is_src_field = virtual_columns.iter().any(|v| v.starts_with(f.name())); + let is_src_field = virtual_columns.iter().any(|v| v.0.starts_with(f.name())); if is_src_field { field_indices.push(i); } @@ -154,7 +155,7 @@ pub async fn do_refresh_virtual_column( let virtual_loc = TableMetaLocationGenerator::gen_virtual_block_location(&block_meta.location.0); - let schema = match storage_format { + let arrow_schema = match storage_format { FuseStorageFormat::Parquet => { read_parquet_schema_async_rs(operator, &virtual_loc, None) .await @@ -168,10 +169,14 @@ pub async fn do_refresh_virtual_column( }; // if all virtual columns has be generated, we can ignore this block - let all_generated = if let Some(schema) = schema { - virtual_columns - .iter() - .all(|virtual_name| schema.fields.iter().any(|f| f.name() == virtual_name)) + let all_generated = if let Some(arrow_schema) = arrow_schema { + let virtual_table_schema = TableSchema::try_from(&arrow_schema)?; + virtual_columns.iter().all(|v| { + virtual_table_schema + .fields + .iter() + .any(|f| *f.name() == v.0 && *f.data_type() == v.1) + }) } else { false }; @@ -203,9 +208,18 @@ pub async fn do_refresh_virtual_column( let mut virtual_fields = Vec::with_capacity(virtual_columns.len()); let mut virtual_exprs = Vec::with_capacity(virtual_columns.len()); - for virtual_column in virtual_columns { - let virtual_expr = + for (virtual_column, virtual_type) in virtual_columns { + let mut virtual_expr = parse_computed_expr(ctx.clone(), source_schema.clone(), &virtual_column)?; + + if virtual_type.remove_nullable() != TableDataType::Variant { + virtual_expr = Expr::Cast { + span: None, + is_try: true, + expr: Box::new(virtual_expr), + dest_type: (&virtual_type).into(), + } + } let virtual_field = TableField::new( &virtual_column, infer_schema_type(virtual_expr.data_type())?, @@ -343,6 +357,7 @@ impl AsyncTransform for VirtualColumnTransform { let virtual_block = DataBlock::new(virtual_entries, len); let mut buffer = Vec::with_capacity(DEFAULT_BLOCK_BUFFER_SIZE); + let _ = serialize_block( &self.write_settings, &self.virtual_schema, diff --git a/src/query/ee/src/virtual_column/virtual_column_handler.rs b/src/query/ee/src/virtual_column/virtual_column_handler.rs index c400ed106f34..c6ad11019d36 100644 --- a/src/query/ee/src/virtual_column/virtual_column_handler.rs +++ b/src/query/ee/src/virtual_column/virtual_column_handler.rs @@ -18,6 +18,7 @@ use databend_common_base::base::GlobalInstance; use databend_common_catalog::catalog::Catalog; use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; +use databend_common_expression::TableDataType; use databend_common_meta_app::schema::CreateVirtualColumnReq; use databend_common_meta_app::schema::DropVirtualColumnReq; use databend_common_meta_app::schema::ListVirtualColumnsReq; @@ -75,7 +76,7 @@ impl VirtualColumnHandler for RealVirtualColumnHandler { &self, ctx: Arc, fuse_table: &FuseTable, - virtual_columns: Vec, + virtual_columns: Vec<(String, TableDataType)>, segment_locs: Option>, pipeline: &mut Pipeline, ) -> Result<()> { diff --git a/src/query/ee/tests/it/storages/fuse/operations/virtual_columns.rs b/src/query/ee/tests/it/storages/fuse/operations/virtual_columns.rs index 5c9d3d0ab3ff..2259feaf9bf7 100644 --- a/src/query/ee/tests/it/storages/fuse/operations/virtual_columns.rs +++ b/src/query/ee/tests/it/storages/fuse/operations/virtual_columns.rs @@ -14,6 +14,8 @@ use databend_common_base::base::tokio; use databend_common_exception::Result; +use databend_common_expression::types::NumberDataType; +use databend_common_expression::TableDataType; use databend_common_storage::read_parquet_schema_async_rs; use databend_common_storages_fuse::io::BlockReader; use databend_common_storages_fuse::io::MetaReaders; @@ -47,7 +49,20 @@ async fn test_fuse_do_refresh_virtual_column() -> Result<()> { let fuse_table = FuseTable::try_from_table(table.as_ref())?; let dal = fuse_table.get_operator_ref(); - let virtual_columns = vec!["v['a']".to_string(), "v[0]".to_string()]; + let virtual_columns = vec![ + ( + "v['a']".to_string(), + TableDataType::Nullable(Box::new(TableDataType::Variant)), + ), + ( + "v[0]".to_string(), + TableDataType::Nullable(Box::new(TableDataType::Variant)), + ), + ( + "v['b']".to_string(), + TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::Int64))), + ), + ]; let table_ctx = fixture.new_query_ctx().await?; let snapshot_opt = fuse_table.read_table_snapshot().await?; @@ -111,9 +126,10 @@ async fn test_fuse_do_refresh_virtual_column() -> Result<()> { }; assert!(schema.is_some()); let schema = schema.unwrap(); - assert_eq!(schema.fields.len(), 2); + assert_eq!(schema.fields.len(), 3); assert_eq!(schema.fields[0].name(), "v['a']"); assert_eq!(schema.fields[1].name(), "v[0]"); + assert_eq!(schema.fields[2].name(), "v['b']"); } } diff --git a/src/query/ee_features/virtual_column/Cargo.toml b/src/query/ee_features/virtual_column/Cargo.toml index cd95d376abaf..6766f630b6c3 100644 --- a/src/query/ee_features/virtual_column/Cargo.toml +++ b/src/query/ee_features/virtual_column/Cargo.toml @@ -18,6 +18,7 @@ async-trait = { workspace = true } databend-common-base = { workspace = true } databend-common-catalog = { workspace = true } databend-common-exception = { workspace = true } +databend-common-expression = { workspace = true } databend-common-meta-app = { workspace = true } databend-common-pipeline-core = { workspace = true } databend-common-storages-fuse = { workspace = true } diff --git a/src/query/ee_features/virtual_column/src/virtual_column.rs b/src/query/ee_features/virtual_column/src/virtual_column.rs index 310c03726971..54e123dc57d8 100644 --- a/src/query/ee_features/virtual_column/src/virtual_column.rs +++ b/src/query/ee_features/virtual_column/src/virtual_column.rs @@ -18,6 +18,7 @@ use databend_common_base::base::GlobalInstance; use databend_common_catalog::catalog::Catalog; use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; +use databend_common_expression::TableDataType; use databend_common_meta_app::schema::CreateVirtualColumnReq; use databend_common_meta_app::schema::DropVirtualColumnReq; use databend_common_meta_app::schema::ListVirtualColumnsReq; @@ -57,7 +58,7 @@ pub trait VirtualColumnHandler: Sync + Send { &self, ctx: Arc, fuse_table: &FuseTable, - virtual_columns: Vec, + virtual_columns: Vec<(String, TableDataType)>, segment_locs: Option>, pipeline: &mut Pipeline, ) -> Result<()>; @@ -113,7 +114,7 @@ impl VirtualColumnHandlerWrapper { &self, ctx: Arc, fuse_table: &FuseTable, - virtual_columns: Vec, + virtual_columns: Vec<(String, TableDataType)>, segment_locs: Option>, pipeline: &mut Pipeline, ) -> Result<()> { diff --git a/src/query/sql/src/executor/physical_plans/physical_table_scan.rs b/src/query/sql/src/executor/physical_plans/physical_table_scan.rs index 891c892f7531..fab273344ca0 100644 --- a/src/query/sql/src/executor/physical_plans/physical_table_scan.rs +++ b/src/query/sql/src/executor/physical_plans/physical_table_scan.rs @@ -29,6 +29,7 @@ use databend_common_catalog::plan::VirtualColumnInfo; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::type_check::check_function; +use databend_common_expression::type_check::get_simple_cast_function; use databend_common_expression::types::DataType; use databend_common_expression::ConstantFolder; use databend_common_expression::DataField; @@ -36,6 +37,7 @@ use databend_common_expression::DataSchemaRef; use databend_common_expression::DataSchemaRefExt; use databend_common_expression::FieldIndex; use databend_common_expression::RemoteExpr; +use databend_common_expression::TableDataType; use databend_common_expression::TableField; use databend_common_expression::TableSchema; use databend_common_expression::TableSchemaRef; @@ -563,12 +565,20 @@ impl PhysicalPlanBuilder { if let ColumnEntry::VirtualColumn(virtual_column) = self.metadata.read().column(*index) { source_column_ids.insert(virtual_column.source_column_id); + let cast_func_name = + if virtual_column.data_type.remove_nullable() != TableDataType::Variant { + let dest_type = DataType::from(&virtual_column.data_type.remove_nullable()); + get_simple_cast_function(true, &DataType::Variant, &dest_type) + } else { + None + }; let virtual_column_field = VirtualColumnField { source_column_id: virtual_column.source_column_id, source_name: virtual_column.source_column_name.clone(), column_id: virtual_column.column_id, name: virtual_column.column_name.clone(), key_paths: virtual_column.key_paths.clone(), + cast_func_name, data_type: Box::new(virtual_column.data_type.clone()), is_created: virtual_column.is_created, }; diff --git a/src/query/sql/src/planner/binder/bind_context.rs b/src/query/sql/src/planner/binder/bind_context.rs index 738816402d47..5d8e54e5509f 100644 --- a/src/query/sql/src/planner/binder/bind_context.rs +++ b/src/query/sql/src/planner/binder/bind_context.rs @@ -15,6 +15,7 @@ use std::collections::btree_map; use std::collections::BTreeMap; use std::collections::HashMap; +use std::collections::HashSet; use std::hash::Hash; use std::sync::Arc; @@ -32,6 +33,7 @@ use databend_common_expression::ColumnId; use databend_common_expression::DataField; use databend_common_expression::DataSchemaRef; use databend_common_expression::DataSchemaRefExt; +use databend_common_expression::TableDataType; use enum_as_inner::EnumAsInner; use indexmap::IndexMap; use itertools::Itertools; @@ -108,6 +110,25 @@ pub enum NameResolutionResult { Alias { alias: String, scalar: ScalarExpr }, } +#[derive(Default, Clone, PartialEq, Eq, Debug)] +pub struct VirtualColumnContext { + /// Whether allow rewrite as virtual column and pushdown. + pub allow_pushdown: bool, + /// The table indics of the virtual column has been readded, + /// used to avoid repeated reading + pub table_indices: HashSet, + /// Mapping: (table index) -> (derived virtual column indices) + /// This is used to add virtual column indices to Scan plan + pub virtual_column_indices: HashMap>, + /// Mapping: (table index) -> (virtual column names and data types) + /// This is used to check whether the virtual column has be created + pub virtual_column_names: HashMap>, + /// Mapping: (table index) -> (next virtual column id) + /// The is used to generate virtual column id for virtual columns. + /// Not a real column id, only used to identify a virtual column. + pub next_column_ids: HashMap, +} + /// `BindContext` stores all the free variables in a query and tracks the context of binding procedure. #[derive(Clone, Debug)] pub struct BindContext { @@ -146,6 +167,8 @@ pub struct BindContext { pub inverted_index_map: Box>, + pub virtual_column_context: VirtualColumnContext, + pub expr_context: ExprContext, /// If true, the query is planning for aggregate index. @@ -259,6 +282,7 @@ impl BindContext { have_udf_script: false, have_udf_server: false, inverted_index_map: Box::default(), + virtual_column_context: VirtualColumnContext::default(), expr_context: ExprContext::default(), planning_agg_index: false, window_definitions: DashMap::new(), @@ -280,6 +304,7 @@ impl BindContext { have_udf_script: false, have_udf_server: false, inverted_index_map: Box::default(), + virtual_column_context: Default::default(), expr_context: ExprContext::default(), planning_agg_index: false, window_definitions: DashMap::new(), diff --git a/src/query/sql/src/planner/binder/bind_query/bind_select.rs b/src/query/sql/src/planner/binder/bind_query/bind_select.rs index bd8fb366b9c5..ff28a910dae6 100644 --- a/src/query/sql/src/planner/binder/bind_query/bind_select.rs +++ b/src/query/sql/src/planner/binder/bind_query/bind_select.rs @@ -38,6 +38,8 @@ use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::DataBlock; use databend_common_expression::ScalarRef; +use databend_common_license::license::Feature; +use databend_common_license::license_manager::LicenseManagerSwitch; use derive_visitor::Drive; use derive_visitor::Visitor; use log::warn; @@ -48,7 +50,6 @@ use crate::planner::binder::Binder; use crate::planner::query_executor::QueryExecutor; use crate::AsyncFunctionRewriter; use crate::ColumnBinding; -use crate::VirtualColumnRewriter; // A normalized IR for `SELECT` clause. #[derive(Debug, Default)] @@ -99,6 +100,12 @@ impl Binder { self.bind_table_reference(bind_context, &cross_joins)? }; + // whether allow rewrite virtual column and pushdown + let allow_pushdown = LicenseManagerSwitch::instance() + .check_enterprise_enabled(self.ctx.get_license_key(), Feature::VirtualColumn) + .is_ok(); + from_context.virtual_column_context.allow_pushdown = allow_pushdown; + let mut rewriter = SelectRewriter::new( from_context.all_column_bindings(), self.name_resolution_ctx.unquoted_ident_case_sensitive, @@ -248,9 +255,13 @@ impl Binder { s_expr = self.rewrite_udf(&mut from_context, s_expr)?; // rewrite variant inner fields as virtual columns - let mut virtual_column_rewriter = - VirtualColumnRewriter::new(self.ctx.clone(), self.metadata.clone()); - s_expr = virtual_column_rewriter.rewrite(&s_expr)?; + if !from_context + .virtual_column_context + .virtual_column_indices + .is_empty() + { + s_expr = self.rewrite_virtual_column(&mut from_context, s_expr)?; + } // add internal column binding into expr s_expr = self.add_internal_column_into_expr(&mut from_context, s_expr)?; diff --git a/src/query/sql/src/planner/binder/ddl/virtual_column.rs b/src/query/sql/src/planner/binder/ddl/virtual_column.rs index 24ee4a2effd7..1b21ac9f52b3 100644 --- a/src/query/sql/src/planner/binder/ddl/virtual_column.rs +++ b/src/query/sql/src/planner/binder/ddl/virtual_column.rs @@ -12,8 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashSet; +use std::collections::HashMap; use std::collections::VecDeque; +use std::mem; use databend_common_ast::ast::AlterVirtualColumnStmt; use databend_common_ast::ast::CreateVirtualColumnStmt; @@ -26,6 +27,8 @@ use databend_common_ast::ast::ShowLimit; use databend_common_ast::ast::ShowVirtualColumnsStmt; use databend_common_exception::ErrorCode; use databend_common_exception::Result; +use databend_common_expression::type_check::get_simple_cast_function; +use databend_common_expression::types::DataType; use databend_common_expression::TableDataType; use databend_common_expression::TableSchemaRef; use databend_common_meta_app::schema::ListVirtualColumnsReq; @@ -33,14 +36,17 @@ use log::debug; use crate::binder::Binder; use crate::normalize_identifier; +use crate::optimizer::SExpr; use crate::plans::AlterVirtualColumnPlan; use crate::plans::CreateVirtualColumnPlan; use crate::plans::DropVirtualColumnPlan; use crate::plans::Plan; use crate::plans::RefreshVirtualColumnPlan; use crate::plans::RewriteKind; +use crate::resolve_type_name; use crate::BindContext; use crate::SelectBuilder; +use crate::VirtualColumnRewriter; impl Binder { #[async_backtrace::framed] @@ -197,10 +203,30 @@ impl Binder { &mut self, virtual_columns: &[Expr], schema: TableSchemaRef, - ) -> Result> { - let mut virtual_names = HashSet::with_capacity(virtual_columns.len()); + ) -> Result> { + let mut virtual_names = HashMap::with_capacity(virtual_columns.len()); for virtual_column in virtual_columns.iter() { + let mut typ = None; let mut expr = virtual_column; + match expr { + Expr::Cast { + expr: inner_expr, + target_type, + .. + } => { + expr = inner_expr; + typ = Some(target_type); + } + Expr::TryCast { + expr: inner_expr, + target_type, + .. + } => { + expr = inner_expr; + typ = Some(target_type); + } + _ => {} + } let mut paths = VecDeque::new(); while let Expr::MapAccess { expr: inner_expr, @@ -253,7 +279,30 @@ impl Binder { } virtual_name.push(']'); } - virtual_names.insert(virtual_name); + + let data_type = if let Some(typ) = typ { + let data_type = resolve_type_name(typ, false)?; + let dest_type = DataType::from(&data_type.remove_nullable()); + let cast_func_name = + get_simple_cast_function(true, &DataType::Variant, &dest_type); + if cast_func_name.is_none() { + return Err(ErrorCode::SemanticError(format!( + "Unsupported cast data type: {:?}", + typ + ))); + } + data_type.wrap_nullable() + } else { + TableDataType::Nullable(Box::new(TableDataType::Variant)) + }; + + if virtual_names.contains_key(&virtual_name) { + return Err(ErrorCode::SemanticError(format!( + "Duplicate virtual column: {}", + virtual_name + ))); + } + virtual_names.insert(virtual_name, data_type); } else { return Err(ErrorCode::SemanticError(format!( "Column is not exist: {:?}", @@ -268,8 +317,7 @@ impl Binder { } } let mut virtual_columns: Vec<_> = virtual_names.into_iter().collect(); - virtual_columns.sort(); - + virtual_columns.sort_by(|lv, rv| lv.0.cmp(&rv.0)); Ok(virtual_columns) } @@ -334,4 +382,25 @@ impl Binder { self.bind_rewrite_to_query(bind_context, &query, RewriteKind::ShowVirtualColumns) .await } + + // Rewrite virtual columns, add virtual column index to Scan plan. + pub(in crate::planner::binder) fn rewrite_virtual_column( + &mut self, + bind_context: &mut BindContext, + s_expr: SExpr, + ) -> Result { + if bind_context + .virtual_column_context + .virtual_column_indices + .is_empty() + { + return Ok(s_expr); + } + let virtual_column_indices = + mem::take(&mut bind_context.virtual_column_context.virtual_column_indices); + let mut s_expr = s_expr.clone(); + let mut virtual_column_rewriter = VirtualColumnRewriter::new(virtual_column_indices); + s_expr = virtual_column_rewriter.rewrite(&s_expr)?; + Ok(s_expr) + } } diff --git a/src/query/sql/src/planner/binder/table.rs b/src/query/sql/src/planner/binder/table.rs index dabd6ad96879..5502dccb6acd 100644 --- a/src/query/sql/src/planner/binder/table.rs +++ b/src/query/sql/src/planner/binder/table.rs @@ -231,6 +231,7 @@ impl Binder { have_udf_script: false, have_udf_server: false, inverted_index_map: Box::default(), + virtual_column_context: Default::default(), expr_context: ExprContext::default(), planning_agg_index: false, window_definitions: DashMap::new(), diff --git a/src/query/sql/src/planner/plans/ddl/virtual_column.rs b/src/query/sql/src/planner/plans/ddl/virtual_column.rs index f853afd33779..7ac990ed780e 100644 --- a/src/query/sql/src/planner/plans/ddl/virtual_column.rs +++ b/src/query/sql/src/planner/plans/ddl/virtual_column.rs @@ -16,6 +16,7 @@ use std::sync::Arc; use databend_common_expression::DataSchema; use databend_common_expression::DataSchemaRef; +use databend_common_expression::TableDataType; use databend_common_meta_app::schema::CreateOption; use databend_storages_common_table_meta::meta::Location; @@ -25,7 +26,7 @@ pub struct CreateVirtualColumnPlan { pub catalog: String, pub database: String, pub table: String, - pub virtual_columns: Vec, + pub virtual_columns: Vec<(String, TableDataType)>, } impl CreateVirtualColumnPlan { @@ -40,7 +41,7 @@ pub struct AlterVirtualColumnPlan { pub catalog: String, pub database: String, pub table: String, - pub virtual_columns: Vec, + pub virtual_columns: Vec<(String, TableDataType)>, } impl AlterVirtualColumnPlan { @@ -68,7 +69,7 @@ pub struct RefreshVirtualColumnPlan { pub catalog: String, pub database: String, pub table: String, - pub virtual_columns: Vec, + pub virtual_columns: Vec<(String, TableDataType)>, pub segment_locs: Option>, } diff --git a/src/query/sql/src/planner/semantic/type_check.rs b/src/query/sql/src/planner/semantic/type_check.rs index 38350db16026..c7209a1ac4c9 100644 --- a/src/query/sql/src/planner/semantic/type_check.rs +++ b/src/query/sql/src/planner/semantic/type_check.rs @@ -52,6 +52,7 @@ use databend_common_catalog::plan::InternalColumn; use databend_common_catalog::plan::InternalColumnType; use databend_common_catalog::plan::InvertedIndexInfo; use databend_common_catalog::plan::InvertedIndexOption; +use databend_common_catalog::table::Table; use databend_common_catalog::table_context::TableContext; use databend_common_compress::CompressAlgorithm; use databend_common_compress::DecompressDecoder; @@ -97,6 +98,7 @@ use databend_common_meta_app::principal::UDFServer; use databend_common_meta_app::schema::dictionary_name_ident::DictionaryNameIdent; use databend_common_meta_app::schema::DictionaryIdentity; use databend_common_meta_app::schema::GetSequenceReq; +use databend_common_meta_app::schema::ListVirtualColumnsReq; use databend_common_meta_app::schema::SequenceIdent; use databend_common_storage::init_stage_operator; use databend_common_users::UserApiProvider; @@ -158,8 +160,11 @@ use crate::plans::WindowOrderBy; use crate::BaseTableColumn; use crate::BindContext; use crate::ColumnBinding; +use crate::ColumnBindingBuilder; use crate::ColumnEntry; use crate::MetadataRef; +use crate::TableEntry; +use crate::Visibility; /// A helper for type checking. /// @@ -4524,6 +4529,178 @@ impl<'a> TypeChecker<'a> { Ok(Box::new((subquery_expr.into(), data_type))) } + async fn get_virtual_columns( + &self, + table_entry: &TableEntry, + table: Arc, + ) -> Result>> { + let table_id = table.get_id(); + let req = ListVirtualColumnsReq::new(self.ctx.get_tenant(), Some(table_id)); + let catalog = self.ctx.get_catalog(table_entry.catalog()).await?; + + if let Ok(virtual_column_metas) = catalog.list_virtual_columns(req).await { + if !virtual_column_metas.is_empty() { + let mut virtual_column_name_map = + HashMap::with_capacity(virtual_column_metas[0].virtual_columns.len()); + for (name, typ) in virtual_column_metas[0].virtual_columns.iter() { + virtual_column_name_map.insert(name.clone(), typ.clone()); + } + return Ok(Some(virtual_column_name_map)); + } + } + Ok(None) + } + + fn try_rewrite_virtual_column( + &mut self, + base_column: &BaseTableColumn, + keypaths: &KeyPaths, + ) -> Result>> { + if !self.bind_context.virtual_column_context.allow_pushdown { + return Ok(None); + } + + let metadata = self.metadata.read().clone(); + let table_entry = metadata.table(base_column.table_index); + + let table = table_entry.table(); + // Ignore tables that do not support virtual columns + if !table.support_virtual_columns() { + return Ok(None); + } + let schema = table.schema(); + + if !self + .bind_context + .virtual_column_context + .table_indices + .contains(&base_column.table_index) + { + let virtual_column_name_map = databend_common_base::runtime::block_on( + self.get_virtual_columns(table_entry, table), + )?; + self.bind_context + .virtual_column_context + .table_indices + .insert(base_column.table_index); + if let Some(virtual_column_name_map) = virtual_column_name_map { + self.bind_context + .virtual_column_context + .virtual_column_names + .insert(base_column.table_index, virtual_column_name_map); + self.bind_context + .virtual_column_context + .next_column_ids + .insert(base_column.table_index, schema.next_column_id); + } + } + + if let Some(virtual_column_name_map) = self + .bind_context + .virtual_column_context + .virtual_column_names + .get(&base_column.table_index) + { + let mut name = String::new(); + name.push_str(&base_column.column_name); + for path in &keypaths.paths { + name.push('['); + match path { + KeyPath::Index(idx) => { + name.push_str(&idx.to_string()); + } + KeyPath::QuotedName(field) | KeyPath::Name(field) => { + name.push('\''); + name.push_str(field.as_ref()); + name.push('\''); + } + } + name.push(']'); + } + + let virtual_type = virtual_column_name_map.get(&name); + let is_created = virtual_type.is_some(); + + let mut index = 0; + // Check for duplicate virtual columns + for table_column in metadata.virtual_columns_by_table_index(base_column.table_index) { + if table_column.name() == name { + index = table_column.index(); + break; + } + } + + let table_data_type = if let Some(virtual_type) = virtual_type { + virtual_type.wrap_nullable() + } else { + TableDataType::Nullable(Box::new(TableDataType::Variant)) + }; + + if index == 0 { + let column_id = self + .bind_context + .virtual_column_context + .next_column_ids + .get(&base_column.table_index) + .unwrap(); + + let keypaths_str = format!("{}", keypaths); + let keypaths_value = Scalar::String(keypaths_str); + + index = self.metadata.write().add_virtual_column( + base_column, + *column_id, + name.clone(), + table_data_type.clone(), + keypaths_value.clone(), + None, + is_created, + ); + + // Increments the column id of the virtual column. + let column_id = self + .bind_context + .virtual_column_context + .next_column_ids + .get_mut(&base_column.table_index) + .unwrap(); + *column_id += 1; + } + + if let Some(indices) = self + .bind_context + .virtual_column_context + .virtual_column_indices + .get_mut(&base_column.table_index) + { + indices.push(index); + } else { + self.bind_context + .virtual_column_context + .virtual_column_indices + .insert(base_column.table_index, vec![index]); + } + + let data_type = DataType::from(&table_data_type); + let column_binding = ColumnBindingBuilder::new( + name, + index, + Box::new(data_type.clone()), + Visibility::InVisible, + ) + .table_index(Some(base_column.table_index)) + .build(); + + let virtual_column = ScalarExpr::BoundColumnRef(BoundColumnRef { + span: None, + column: column_binding, + }); + Ok(Some(Box::new((virtual_column, data_type)))) + } else { + Ok(None) + } + } + // Rewrite variant map access as `get_by_keypath` function fn resolve_variant_map_access( &mut self, @@ -4550,8 +4727,23 @@ impl<'a> TypeChecker<'a> { }; key_paths.push(key_path); } + let keypaths = KeyPaths { paths: key_paths }; + // try rewrite as virtual column and pushdown to storage layer. + if let ScalarExpr::BoundColumnRef(BoundColumnRef { ref column, .. }) = scalar { + if column.index < self.metadata.read().columns().len() { + let column_entry = self.metadata.read().column(column.index).clone(); + if let ColumnEntry::BaseTableColumn(base_column) = column_entry { + if let Some(box (scalar, data_type)) = + self.try_rewrite_virtual_column(&base_column, &keypaths)? + { + return Ok(Box::new((scalar, data_type))); + } + } + } + } + let keypaths_str = format!("{}", keypaths); let path_scalar = ScalarExpr::ConstantExpr(ConstantExpr { span: None, diff --git a/src/query/sql/src/planner/semantic/virtual_column_rewriter.rs b/src/query/sql/src/planner/semantic/virtual_column_rewriter.rs index 39cdd90bdc43..02dd594f63dd 100644 --- a/src/query/sql/src/planner/semantic/virtual_column_rewriter.rs +++ b/src/query/sql/src/planner/semantic/virtual_column_rewriter.rs @@ -13,325 +13,50 @@ // limitations under the License. use std::collections::HashMap; -use std::collections::HashSet; use std::sync::Arc; -use databend_common_catalog::table::Table; -use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; -use databend_common_expression::types::DataType; -use databend_common_expression::Scalar; -use databend_common_expression::TableDataType; -use databend_common_license::license::Feature::VirtualColumn; -use databend_common_license::license_manager::LicenseManagerSwitch; -use databend_common_meta_app::schema::ListVirtualColumnsReq; -use jsonb::keypath::parse_key_paths; -use jsonb::keypath::KeyPath; use crate::optimizer::SExpr; -use crate::plans::walk_expr_mut; -use crate::plans::BoundColumnRef; -use crate::plans::FunctionCall; use crate::plans::RelOperator; -use crate::plans::ScalarExpr; -use crate::plans::VisitorMut; -use crate::ColumnBindingBuilder; -use crate::ColumnEntry; use crate::IndexType; -use crate::MetadataRef; -use crate::TableEntry; -use crate::Visibility; pub(crate) struct VirtualColumnRewriter { - ctx: Arc, - metadata: MetadataRef, - /// Mapping: (table index) -> (derived virtual column indices) /// This is used to add virtual column indices to Scan plan - table_virtual_columns: HashMap>, - - /// Mapping: (table index) -> (virtual column names) - /// This is used to check whether the virtual column has be created - virtual_column_names: HashMap>, - - /// Mapping: (table index) -> (next virtual column id) - /// The is used to generate virtual column id for virtual columns. - /// Not a real column id, only used to identify a virtual column. - next_column_ids: HashMap, + virtual_column_indices: HashMap>, } impl VirtualColumnRewriter { - pub(crate) fn new(ctx: Arc, metadata: MetadataRef) -> Self { + pub(crate) fn new(virtual_column_indices: HashMap>) -> Self { Self { - ctx, - metadata, - table_virtual_columns: Default::default(), - virtual_column_names: Default::default(), - next_column_ids: Default::default(), - } - } - - pub(crate) fn rewrite(&mut self, s_expr: &SExpr) -> Result { - if LicenseManagerSwitch::instance() - .check_enterprise_enabled(self.ctx.get_license_key(), VirtualColumn) - .is_err() - { - return Ok(s_expr.clone()); - } - - let metadata = self.metadata.read().clone(); - for table_entry in metadata.tables() { - let table = table_entry.table(); - // Ignore tables that do not support virtual columns - if !table.support_virtual_columns() { - continue; - } - - let schema = table.schema(); - let has_variant_column = schema - .fields - .iter() - .any(|field| field.data_type.remove_nullable() == TableDataType::Variant); - // Ignore tables that do not have fields of variant type - if !has_variant_column { - continue; - } - self.next_column_ids - .insert(table_entry.index(), schema.next_column_id); - - databend_common_base::runtime::block_on(self.get_virtual_columns(table_entry, table))?; - } - // If all tables do not have virtual columns created, - // there is no need to continue checking for rewrites as virtual columns - if self.virtual_column_names.is_empty() { - return Ok(s_expr.clone()); + virtual_column_indices, } - - self.rewrite_virtual_column(s_expr) } - async fn get_virtual_columns( - &mut self, - table_entry: &TableEntry, - table: Arc, - ) -> Result<()> { - let table_id = table.get_id(); - let req = ListVirtualColumnsReq::new(self.ctx.get_tenant(), Some(table_id)); - let catalog = self.ctx.get_catalog(table_entry.catalog()).await?; - - if let Ok(virtual_column_metas) = catalog.list_virtual_columns(req).await { - if !virtual_column_metas.is_empty() { - let virtual_column_name_set = - HashSet::from_iter(virtual_column_metas[0].virtual_columns.iter().cloned()); - self.virtual_column_names - .insert(table_entry.index(), virtual_column_name_set); - } - } - Ok(()) - } - - // Find the functions that reads the inner fields of variant columns, rewrite them as virtual columns. - // Add the indices of the virtual columns to the Scan plan of the corresponding table + // Add the indices of the virtual columns to the Scan plan of the table // to read the virtual columns at the storage layer. #[recursive::recursive] - fn rewrite_virtual_column(&mut self, s_expr: &SExpr) -> Result { + pub(crate) fn rewrite(&mut self, s_expr: &SExpr) -> Result { let mut s_expr = s_expr.clone(); - match (*s_expr.plan).clone() { - RelOperator::Scan(mut scan) => { - let virtual_indices = self.table_virtual_columns.get(&scan.table_index); - if let Some(indices) = virtual_indices { - for index in indices { - scan.columns.insert(*index); - } - s_expr.plan = Arc::new(scan.into()); + if let RelOperator::Scan(mut scan) = (*s_expr.plan).clone() { + if let Some(indices) = self.virtual_column_indices.remove(&scan.table_index) { + for index in indices { + scan.columns.insert(index); } + s_expr.plan = Arc::new(scan.into()); } - RelOperator::EvalScalar(mut eval_scalar) => { - for item in &mut eval_scalar.items { - if self - .try_replace_virtual_column(&mut item.scalar, Some(item.index)) - .is_some() - { - continue; - } - self.visit(&mut item.scalar)?; - } - s_expr.plan = Arc::new(eval_scalar.into()); - } - RelOperator::Filter(mut filter) => { - for scalar in &mut filter.predicates { - self.visit(scalar)?; - } - s_expr.plan = Arc::new(filter.into()); - } - RelOperator::ProjectSet(mut project_set) => { - for item in &mut project_set.srfs { - if self - .try_replace_virtual_column(&mut item.scalar, Some(item.index)) - .is_some() - { - continue; - } - self.visit(&mut item.scalar)?; - } - s_expr.plan = Arc::new(project_set.into()); - } - _ => {} } if !s_expr.children.is_empty() { let mut children = Vec::with_capacity(s_expr.children.len()); for child in s_expr.children.iter() { - children.push(Arc::new(self.rewrite_virtual_column(child)?)); + children.push(Arc::new(self.rewrite(child)?)); } s_expr.children = children; } Ok(s_expr) } - - // Find the `get_by_keypath` function that takes a variant column and a constant path value as arguments. - // Generate a virtual column in its place so that we can push down the reading virtual column to the storage layer. - // This allows us to using the already generated and stored virtual column data to speed up queries. - // TODO: Support other variant `get` functions. - fn try_replace_virtual_column( - &mut self, - expr: &mut ScalarExpr, - item_index: Option, - ) -> Option<()> { - match expr { - ScalarExpr::FunctionCall(FunctionCall { - func_name, - arguments, - .. - }) if func_name == "get_by_keypath" && arguments.len() == 2 => { - if let ( - ScalarExpr::BoundColumnRef(column_ref), - ScalarExpr::ConstantExpr(constant), - ) = (arguments[0].clone(), arguments[1].clone()) - { - let column_entry = self.metadata.read().column(column_ref.column.index).clone(); - if let ColumnEntry::BaseTableColumn(base_column) = column_entry { - if base_column.data_type.remove_nullable() != TableDataType::Variant { - return Some(()); - } - let name = match constant.value.clone() { - Scalar::String(v) => match parse_key_paths(v.as_bytes()) { - Ok(key_paths) => { - let mut name = String::new(); - name.push_str(&base_column.column_name); - for path in key_paths.paths { - name.push('['); - match path { - KeyPath::Index(idx) => { - name.push_str(&idx.to_string()); - } - KeyPath::QuotedName(field) | KeyPath::Name(field) => { - name.push('\''); - name.push_str(field.as_ref()); - name.push('\''); - } - } - name.push(']'); - } - name - } - Err(_) => { - return Some(()); - } - }, - _ => { - return Some(()); - } - }; - // If this field name does not have a virtual column created, - // we can't read this data directly, but we can still generate a virtual column - // and push down to the storage layer. - // Those virtual columns can be generated from the source column. - // This can be used to reminder user to create it to speed up the query. - let is_created = - match self.virtual_column_names.get(&base_column.table_index) { - Some(names) => names.contains(&name), - None => false, - }; - - let mut index = 0; - // Check for duplicate virtual columns - for table_column in self - .metadata - .read() - .virtual_columns_by_table_index(base_column.table_index) - { - if table_column.name() == name { - index = table_column.index(); - break; - } - } - if index == 0 { - let column_id = - self.next_column_ids.get(&base_column.table_index).unwrap(); - let table_data_type = - TableDataType::Nullable(Box::new(TableDataType::Variant)); - index = self.metadata.write().add_virtual_column( - &base_column, - *column_id, - name.clone(), - table_data_type, - constant.value.clone(), - item_index, - is_created, - ); - - // Increments the column id of the virtual column. - let column_id = self - .next_column_ids - .get_mut(&base_column.table_index) - .unwrap(); - *column_id += 1; - } - - if let Some(indices) = - self.table_virtual_columns.get_mut(&base_column.table_index) - { - indices.push(index); - } else { - self.table_virtual_columns - .insert(base_column.table_index, vec![index]); - } - - let column_binding = ColumnBindingBuilder::new( - name, - index, - Box::new(DataType::Nullable(Box::new(DataType::Variant))), - Visibility::InVisible, - ) - .table_index(Some(base_column.table_index)) - .build(); - - let virtual_column = ScalarExpr::BoundColumnRef(BoundColumnRef { - span: None, - column: column_binding, - }); - *expr = virtual_column; - return Some(()); - } - } - } - _ => {} - } - - None - } -} - -impl<'a> VisitorMut<'a> for VirtualColumnRewriter { - fn visit(&mut self, expr: &'a mut ScalarExpr) -> Result<()> { - if self.try_replace_virtual_column(expr, None).is_some() { - return Ok(()); - } - walk_expr_mut(self, expr)?; - - Ok(()) - } } diff --git a/src/query/storages/fuse/src/io/read/block/block_reader_native_deserialize.rs b/src/query/storages/fuse/src/io/read/block/block_reader_native_deserialize.rs index 5ac0643be8f0..95a517865669 100644 --- a/src/query/storages/fuse/src/io/read/block/block_reader_native_deserialize.rs +++ b/src/query/storages/fuse/src/io/read/block/block_reader_native_deserialize.rs @@ -287,9 +287,10 @@ impl BlockReader { pub(crate) fn build_virtual_column_iter( name: String, + data_type: TableDataType, readers: Vec>>, ) -> Result> { - let field = TableField::new(&name, TableDataType::Variant.wrap_nullable()); + let field = TableField::new(&name, data_type); let native_column_reader = NativeColumnsReader::new()?; match native_column_reader.column_iters(readers, field, vec![]) { diff --git a/src/query/storages/fuse/src/io/read/virtual_column/virtual_column_reader_parquet.rs b/src/query/storages/fuse/src/io/read/virtual_column/virtual_column_reader_parquet.rs index 742213e32027..60f5e9d481fb 100644 --- a/src/query/storages/fuse/src/io/read/virtual_column/virtual_column_reader_parquet.rs +++ b/src/query/storages/fuse/src/io/read/virtual_column/virtual_column_reader_parquet.rs @@ -178,7 +178,19 @@ impl VirtualColumnReader { &BUILTIN_FUNCTIONS, )?; - let column = BlockEntry::new(data_type, value); + let column = if let Some(cast_func_name) = &virtual_column_field.cast_func_name { + let (cast_value, cast_data_type) = eval_function( + None, + cast_func_name, + [(value, data_type)], + &func_ctx, + data_block.num_rows(), + &BUILTIN_FUNCTIONS, + )?; + BlockEntry::new(cast_data_type, cast_value) + } else { + BlockEntry::new(data_type, value) + }; data_block.add_column(column); } diff --git a/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs b/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs index 6b3c0945d0e4..d365acc0b5da 100644 --- a/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs +++ b/src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs @@ -440,8 +440,7 @@ impl NativeDeserializeDataTransform { .find(|c| c.0 == src_index) .map(|c| c.1.clone()) { - let data_type: DataType = - (*self.src_schema.field(src_index).data_type()).clone(); + let data_type: DataType = virtual_column_field.data_type.as_ref().into(); let num_rows = column.len(); let column = BlockEntry::new(data_type.clone(), Value::Column(column.clone())); // If the source column is the default value, num_rows may be zero @@ -471,7 +470,19 @@ impl NativeDeserializeDataTransform { &BUILTIN_FUNCTIONS, )?; - let column = BlockEntry::new(data_type, value); + let column = if let Some(cast_func_name) = &virtual_column_field.cast_func_name { + let (cast_value, cast_data_type) = eval_function( + None, + cast_func_name, + [(value, data_type)], + &self.func_ctx, + block.num_rows(), + &BUILTIN_FUNCTIONS, + )?; + BlockEntry::new(cast_data_type, cast_value) + } else { + BlockEntry::new(data_type, value) + }; block.add_column(column); } } @@ -631,7 +642,7 @@ impl NativeDeserializeDataTransform { // Add optional virtual columns' column_iters. if let Some(virtual_reader) = self.virtual_reader.as_ref() { - for (index, virtual_column_info) in virtual_reader + for (index, virtual_column_field) in virtual_reader .virtual_column_info .virtual_column_fields .iter() @@ -640,10 +651,11 @@ impl NativeDeserializeDataTransform { let virtual_index = index + self.block_reader.project_column_nodes.len(); if let Some(readers) = columns.remove(&virtual_index) { let column_iter = BlockReader::build_virtual_column_iter( - virtual_column_info.name.clone(), + virtual_column_field.name.clone(), + *virtual_column_field.data_type.clone(), readers, )?; - let index = self.src_schema.index_of(&virtual_column_info.name)?; + let index = self.src_schema.index_of(&virtual_column_field.name)?; self.read_state.column_iters.insert(index, column_iter); self.read_state.column_skip_pages.insert(index, 0); } diff --git a/src/query/storages/system/src/virtual_columns_table.rs b/src/query/storages/system/src/virtual_columns_table.rs index f60458300b87..a827e9611588 100644 --- a/src/query/storages/system/src/virtual_columns_table.rs +++ b/src/query/storages/system/src/virtual_columns_table.rs @@ -33,6 +33,7 @@ use databend_common_meta_app::schema::TableMeta; use databend_common_meta_app::schema::VirtualColumnMeta; use databend_common_meta_types::MetaId; use databend_common_storages_fuse::TableContext; +use itertools::Itertools; use crate::columns_table::dump_tables; use crate::table::AsyncOneBlockSystemTable; @@ -79,7 +80,19 @@ impl AsyncSystemTable for VirtualColumnsTable { if let Some(virtual_column_meta) = virtual_column_meta_map.remove(&table_id) { database_names.push(database.clone()); table_names.push(table.name().to_string()); - virtual_columns.push(virtual_column_meta.virtual_columns.join(", ")); + virtual_columns.push( + virtual_column_meta + .virtual_columns + .iter() + .map(|(name, ty)| { + if ty.remove_nullable() == TableDataType::Variant { + name.to_string() + } else { + format!("{}::{}", name, ty.remove_nullable()) + } + }) + .join(", "), + ); created_on_columns.push(virtual_column_meta.created_on.timestamp_micros()); updated_on_columns .push(virtual_column_meta.updated_on.map(|u| u.timestamp_micros())); diff --git a/src/tests/sqlsmith/src/sql_gen/ddl.rs b/src/tests/sqlsmith/src/sql_gen/ddl.rs index 0b891f085ec1..d3d93de476d7 100644 --- a/src/tests/sqlsmith/src/sql_gen/ddl.rs +++ b/src/tests/sqlsmith/src/sql_gen/ddl.rs @@ -66,6 +66,11 @@ const SIMPLE_COLUMN_TYPES: [TypeName; 21] = [ impl<'a, R: Rng> SqlGenerator<'a, R> { pub(crate) fn gen_base_tables(&mut self) -> Vec<(DropTableStmt, CreateTableStmt)> { let mut tables = Vec::with_capacity(BASE_TABLE_NAMES.len()); + + let mut table_options = BTreeMap::new(); + if self.rng.gen_bool(0.3) { + table_options.insert("storage_format".to_string(), "native".to_string()); + } for table_name in BASE_TABLE_NAMES { let source = self.gen_table_source(); @@ -76,6 +81,7 @@ impl<'a, R: Rng> SqlGenerator<'a, R> { table: Identifier::from_name(None, table_name), all: false, }; + let create_table = CreateTableStmt { create_option: CreateOption::CreateIfNotExists, catalog: None, @@ -85,7 +91,7 @@ impl<'a, R: Rng> SqlGenerator<'a, R> { engine: Some(Engine::Fuse), uri_location: None, cluster_by: None, - table_options: BTreeMap::new(), + table_options: table_options.clone(), as_query: None, table_type: TableType::Normal, }; diff --git a/tests/sqllogictests/suites/ee/01_ee_system/01_0002_virtual_column.test b/tests/sqllogictests/suites/ee/01_ee_system/01_0002_virtual_column.test index 5a03bce5cb0b..7c04da472a40 100644 --- a/tests/sqllogictests/suites/ee/01_ee_system/01_0002_virtual_column.test +++ b/tests/sqllogictests/suites/ee/01_ee_system/01_0002_virtual_column.test @@ -28,10 +28,10 @@ statement ok create table t1(id int, val json) storage_format = 'native' statement ok -insert into t1 values(1, '{"a":11,"b":1}'), (2, '{"a":22}'), (3, '3') +insert into t1 values(1, '{"a":11,"b":1,"c":"test"}'), (2, '{"a":22,"b":2,"c":"data"}'), (3, '3') statement ok -create virtual column (val['a'], val['b']) for t1 +create virtual column (val['a'], val['b']::int, val['c']::string) for t1 statement ok refresh virtual column for t1 @@ -39,46 +39,46 @@ refresh virtual column for t1 query TTT show virtual columns from t1 ---- -test_virtual_column t1 val['a'], val['b'] +test_virtual_column t1 val['a'], val['b']::Int32, val['c']::String statement ok -insert into t1 values(4, '{"a":44,"b":4}'), (5, '{"a":55}'), (6, '6') +insert into t1 values(4, '{"a":44,"b":4,"c":"value"}'), (5, '{"a":55,"b":5,"c":"bend"}'), (6, '6') -query ITT -select id, val['a'], val['b'] from t1 order by id +query ITIT +select id, val['a'], val['b'], val['c'] from t1 order by id ---- -1 11 1 -2 22 NULL -3 NULL NULL -4 44 4 -5 55 NULL -6 NULL NULL +1 11 1 test +2 22 2 data +3 NULL NULL NULL +4 44 4 value +5 55 5 bend +6 NULL NULL NULL -query ITTT -select id, val['a'], val['b'], val from t1 order by id +query ITITT +select id, val['a'], val['b'], val['c'], val from t1 order by id ---- -1 11 1 {"a":11,"b":1} -2 22 NULL {"a":22} -3 NULL NULL 3 -4 44 4 {"a":44,"b":4} -5 55 NULL {"a":55} -6 NULL NULL 6 +1 11 1 test {"a":11,"b":1,"c":"test"} +2 22 2 data {"a":22,"b":2,"c":"data"} +3 NULL NULL NULL 3 +4 44 4 value {"a":44,"b":4,"c":"value"} +5 55 5 bend {"a":55,"b":5,"c":"bend"} +6 NULL NULL NULL 6 -query ITTT -select id, val['a'], val['b'], val['c'] from t1 order by id +query ITITT +select id, val['a'], val['b'], val['c'], val['d'] from t1 order by id ---- -1 11 1 NULL -2 22 NULL NULL -3 NULL NULL NULL -4 44 4 NULL -5 55 NULL NULL -6 NULL NULL NULL +1 11 1 test NULL +2 22 2 data NULL +3 NULL NULL NULL NULL +4 44 4 value NULL +5 55 5 bend NULL +6 NULL NULL NULL NULL -query ITT -select id, val['a'], val['b'] from t1 where val=3 or val=6 order by id +query ITIT +select id, val['a'], val['b'], val['c'] from t1 where val=3 or val=6 order by id ---- -3 NULL NULL -6 NULL NULL +3 NULL NULL NULL +6 NULL NULL NULL query ITTT select id, val['a'], val['b'], val from t1 where val=3 or val=6 order by id @@ -86,17 +86,17 @@ select id, val['a'], val['b'], val from t1 where val=3 or val=6 order by id 3 NULL NULL 3 6 NULL NULL 6 -query ITT -select id, val['a'], val['b'] from t1 where val['a']=11 or val['a']=44 order by id +query ITIT +select id, val['a'], val['b'], val['c'] from t1 where val['a']=11 or val['a']=44 order by id ---- -1 11 1 -4 44 4 +1 11 1 test +4 44 4 value -query ITTT -select id, val['a'], val['b'], val from t1 where val['a']=11 or val['a']=44 order by id +query ITITT +select id, val['a'], val['b'], val['c'], val from t1 where val['a']=11 or val['a']=44 order by id ---- -1 11 1 {"a":11,"b":1} -4 44 4 {"a":44,"b":4} +1 11 1 test {"a":11,"b":1,"c":"test"} +4 44 4 value {"a":44,"b":4,"c":"value"} query IT select max(id), val:a from t1 group by val:a order by val:a @@ -115,15 +115,15 @@ select t11.id, t11.a, t12.id, t12.a from(select id, val:a as a from t1)t11 join 4 44 4 44 5 55 5 55 -query ITT -SELECT r.id, r.val['a'], r.nval:a FROM ( SELECT r.id, r.val, r.val as nval FROM t1 AS r) AS r order by id +query ITITI +SELECT r.id, r.val['a'], r.val['b'], r.nval:a, r.nval:b FROM ( SELECT r.id, r.val, r.val as nval FROM t1 AS r) AS r order by id ---- -1 11 11 -2 22 22 -3 NULL NULL -4 44 44 -5 55 55 -6 NULL NULL +1 11 1 11 1 +2 22 2 22 2 +3 NULL NULL NULL NULL +4 44 4 44 4 +5 55 5 55 5 +6 NULL NULL NULL NULL statement ok drop table if exists t2 @@ -132,10 +132,10 @@ statement ok create table t2(id int, val json) storage_format = 'parquet' statement ok -insert into t2 values(1, '{"a":11,"b":1}'), (2, '{"a":22}'), (3, '3') +insert into t2 values(1, '{"a":11,"b":1,"c":"test"}'), (2, '{"a":22,"b":2,"c":"data"}'), (3, '3') statement ok -create virtual column (val['a'], val['b']) for t2 +create virtual column (val['a'], val['b']::int, val['c']::string) for t2 statement ok refresh virtual column for t2 @@ -143,46 +143,46 @@ refresh virtual column for t2 query TTT show virtual columns from t2 ---- -test_virtual_column t2 val['a'], val['b'] +test_virtual_column t2 val['a'], val['b']::Int32, val['c']::String statement ok -insert into t2 values(4, '{"a":44,"b":4}'), (5, '{"a":55}'), (6, '6') +insert into t2 values(4, '{"a":44,"b":4,"c":"value"}'), (5, '{"a":55,"b":5,"c":"bend"}'), (6, '6') -query ITT -select id, val['a'], val['b'] from t2 order by id +query ITIT +select id, val['a'], val['b'], val['c'] from t2 order by id ---- -1 11 1 -2 22 NULL -3 NULL NULL -4 44 4 -5 55 NULL -6 NULL NULL +1 11 1 test +2 22 2 data +3 NULL NULL NULL +4 44 4 value +5 55 5 bend +6 NULL NULL NULL -query ITTT -select id, val['a'], val['b'], val from t2 order by id +query ITITT +select id, val['a'], val['b'], val['c'], val from t2 order by id ---- -1 11 1 {"a":11,"b":1} -2 22 NULL {"a":22} -3 NULL NULL 3 -4 44 4 {"a":44,"b":4} -5 55 NULL {"a":55} -6 NULL NULL 6 +1 11 1 test {"a":11,"b":1,"c":"test"} +2 22 2 data {"a":22,"b":2,"c":"data"} +3 NULL NULL NULL 3 +4 44 4 value {"a":44,"b":4,"c":"value"} +5 55 5 bend {"a":55,"b":5,"c":"bend"} +6 NULL NULL NULL 6 -query ITTT -select id, val['a'], val['b'], val['c'] from t2 order by id +query ITITT +select id, val['a'], val['b'], val['c'], val['d'] from t2 order by id ---- -1 11 1 NULL -2 22 NULL NULL -3 NULL NULL NULL -4 44 4 NULL -5 55 NULL NULL -6 NULL NULL NULL +1 11 1 test NULL +2 22 2 data NULL +3 NULL NULL NULL NULL +4 44 4 value NULL +5 55 5 bend NULL +6 NULL NULL NULL NULL -query ITT -select id, val['a'], val['b'] from t2 where val=3 or val=6 order by id +query ITIT +select id, val['a'], val['b'], val['c'] from t2 where val=3 or val=6 order by id ---- -3 NULL NULL -6 NULL NULL +3 NULL NULL NULL +6 NULL NULL NULL query ITTT select id, val['a'], val['b'], val from t2 where val=3 or val=6 order by id @@ -190,17 +190,17 @@ select id, val['a'], val['b'], val from t2 where val=3 or val=6 order by id 3 NULL NULL 3 6 NULL NULL 6 -query ITT -select id, val['a'], val['b'] from t2 where val['a']=11 or val['a']=44 order by id +query ITIT +select id, val['a'], val['b'], val['c'] from t2 where val['a']=11 or val['a']=44 order by id ---- -1 11 1 -4 44 4 +1 11 1 test +4 44 4 value -query ITTT -select id, val['a'], val['b'], val from t2 where val['a']=11 or val['a']=44 order by id +query ITITT +select id, val['a'], val['b'], val['c'], val from t2 where val['a']=11 or val['a']=44 order by id ---- -1 11 1 {"a":11,"b":1} -4 44 4 {"a":44,"b":4} +1 11 1 test {"a":11,"b":1,"c":"test"} +4 44 4 value {"a":44,"b":4,"c":"value"} query IT select max(id), val:a from t2 group by val:a order by val:a @@ -212,22 +212,22 @@ select max(id), val:a from t2 group by val:a order by val:a 6 NULL query ITIT -select t11.id, t11.a, t12.id, t12.a from(select id, val:a as a from t2)t11 join (select id, val:a as a from t2)t12 on t11.a = t12.a order by t11.a +select t21.id, t21.a, t22.id, t22.a from(select id, val:a as a from t2)t21 join (select id, val:a as a from t2)t22 on t21.a = t22.a order by t21.a ---- 1 11 1 11 2 22 2 22 4 44 4 44 5 55 5 55 -query ITT -SELECT r.id, r.val['a'], r.nval:a FROM ( SELECT r.id, r.val, r.val as nval FROM t2 AS r) AS r order by id +query ITITI +SELECT r.id, r.val['a'], r.val['b'], r.nval:a, r.nval:b FROM ( SELECT r.id, r.val, r.val as nval FROM t2 AS r) AS r order by id ---- -1 11 11 -2 22 22 -3 NULL NULL -4 44 44 -5 55 55 -6 NULL NULL +1 11 1 11 1 +2 22 2 22 2 +3 NULL NULL NULL NULL +4 44 4 44 4 +5 55 5 55 5 +6 NULL NULL NULL NULL statement ok create or replace virtual column (val['a']) for t1; From 0c3c0e1e8c7796c14401c7e3f041bbe8c5816887 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Thu, 28 Nov 2024 22:15:47 +0800 Subject: [PATCH 6/7] refactor: make test_watch_expired_events() less sensitive to inaccurate time (#16966) * refactor: remove unused sled related config from databend-meta * chore: adjust gRPC logging level * refactor: make test_watch_expired_events() less sensitive to inaccurate time - Fix: #16942 * refactor: add short sleep to test_meta_node_join_with_state to ensure server quits completely --- src/meta/binaries/metactl/import.rs | 4 +- src/meta/process/src/process_meta_dir.rs | 2 +- src/meta/raft-store/src/config.rs | 20 -------- src/meta/raft-store/src/ondisk/mod.rs | 5 +- .../raft-store/src/ondisk/upgrade_to_v004.rs | 4 +- src/meta/service/src/api/grpc/grpc_service.rs | 7 ++- src/meta/service/src/configs/outer_v0.rs | 20 +------- src/meta/service/tests/it/configs.rs | 1 - .../tests/it/grpc/metasrv_grpc_watch.rs | 48 ++++++++----------- .../tests/it/meta_node/meta_node_lifecycle.rs | 23 +++++---- src/meta/service/tests/it/tests/service.rs | 3 -- 11 files changed, 44 insertions(+), 93 deletions(-) diff --git a/src/meta/binaries/metactl/import.rs b/src/meta/binaries/metactl/import.rs index bf525f7bfacd..f54ac47534c5 100644 --- a/src/meta/binaries/metactl/import.rs +++ b/src/meta/binaries/metactl/import.rs @@ -134,7 +134,7 @@ async fn import_v003( raft_config: RaftConfig, lines: impl IntoIterator>, ) -> anyhow::Result> { - let db = init_get_sled_db(raft_config.raft_dir.clone(), raft_config.sled_cache_size()); + let db = init_get_sled_db(raft_config.raft_dir.clone(), 1024 * 1024 * 1024); let mut n = 0; let mut max_log_id: Option = None; @@ -386,7 +386,7 @@ async fn init_new_cluster( fn clear(args: &ImportArgs) -> anyhow::Result<()> { eprintln!(); eprintln!("Clear All Sled Trees Before Import:"); - let db = init_get_sled_db(args.raft_dir.clone().unwrap(), 64 * 1024 * 1024 * 1024); + let db = init_get_sled_db(args.raft_dir.clone().unwrap(), 1024 * 1024 * 1024); let tree_names = db.tree_names(); for n in tree_names.iter() { diff --git a/src/meta/process/src/process_meta_dir.rs b/src/meta/process/src/process_meta_dir.rs index 0c0d773dd2ba..ee074584d381 100644 --- a/src/meta/process/src/process_meta_dir.rs +++ b/src/meta/process/src/process_meta_dir.rs @@ -25,7 +25,7 @@ pub fn process_sled_db(config: &Config, convert: F) -> anyhow::Result<()> where F: Fn(RaftStoreEntry) -> Result, anyhow::Error> { let raft_config = &config.raft_config; - let db = init_get_sled_db(raft_config.raft_dir.clone(), 64 * 1024 * 1024 * 1024); + let db = init_get_sled_db(raft_config.raft_dir.clone(), 1024 * 1024 * 1024); let mut tree_names = db.tree_names(); tree_names.sort(); diff --git a/src/meta/raft-store/src/config.rs b/src/meta/raft-store/src/config.rs index 6fec9de0867f..b4f59c120aff 100644 --- a/src/meta/raft-store/src/config.rs +++ b/src/meta/raft-store/src/config.rs @@ -135,12 +135,6 @@ pub struct RaftConfig { /// Otherwise this argument is ignored. pub id: NodeId, - /// For test only: specifies the tree name prefix - pub sled_tree_prefix: String, - - /// The maximum memory in MB that sled can use for caching. - pub sled_max_cache_size_mb: u64, - /// The node name. If the user specifies a name, /// the user-supplied name is used, if not, the default name is used. pub cluster_name: String, @@ -190,8 +184,6 @@ impl Default for RaftConfig { leave_via: vec![], leave_id: None, id: 0, - sled_tree_prefix: "".to_string(), - sled_max_cache_size_mb: 10 * 1024, cluster_name: "foo_cluster".to_string(), wait_leader_timeout: 70000, } @@ -298,16 +290,4 @@ impl RaftConfig { } Ok(()) } - - /// Create a unique sled::Tree name by prepending a unique prefix. - /// So that multiple instance that depends on a sled::Tree can be used in one process. - /// sled does not allow to open multiple `sled::Db` in one process. - pub fn tree_name(&self, name: impl std::fmt::Display) -> String { - format!("{}{}", self.sled_tree_prefix, name) - } - - /// Return the size of sled cache in bytes. - pub fn sled_cache_size(&self) -> u64 { - self.sled_max_cache_size_mb * 1024 * 1024 - } } diff --git a/src/meta/raft-store/src/ondisk/mod.rs b/src/meta/raft-store/src/ondisk/mod.rs index 950f894e474d..3478e05a9525 100644 --- a/src/meta/raft-store/src/ondisk/mod.rs +++ b/src/meta/raft-store/src/ondisk/mod.rs @@ -159,10 +159,9 @@ impl OnDisk { return Ok(()); } - let db = init_get_sled_db(config.raft_dir.clone(), config.sled_cache_size()); + let db = init_get_sled_db(config.raft_dir.clone(), 1024 * 1024 * 1024); - let tree_name = config.tree_name(TREE_HEADER); - let tree = SledTree::open(&db, &tree_name, config.is_sync())?; + let tree = SledTree::open(&db, TREE_HEADER, config.is_sync())?; let ks = tree.key_space::(); let header = ks.get(&Self::KEY_HEADER.to_string()).map_err(|e| { diff --git a/src/meta/raft-store/src/ondisk/upgrade_to_v004.rs b/src/meta/raft-store/src/ondisk/upgrade_to_v004.rs index 6d978d119dc1..a982ba1f8b71 100644 --- a/src/meta/raft-store/src/ondisk/upgrade_to_v004.rs +++ b/src/meta/raft-store/src/ondisk/upgrade_to_v004.rs @@ -59,7 +59,7 @@ impl OnDisk { let raft_log = RaftLogV004::open(raft_log_config)?; let mut importer = importer::Importer::new(raft_log); - let db = init_get_sled_db(self.config.raft_dir.clone(), self.config.sled_cache_size()); + let db = init_get_sled_db(self.config.raft_dir.clone(), 1024 * 1024 * 1024); // Read the purged index let first_log_index = { @@ -213,7 +213,7 @@ impl OnDisk { self.progress(format_args!(" Remove V003 log from sled db",)); - let db = init_get_sled_db(self.config.raft_dir.clone(), self.config.sled_cache_size()); + let db = init_get_sled_db(self.config.raft_dir.clone(), 1024 * 1024 * 1024); for tree_name in db.tree_names() { if tree_name == "__sled__default" { continue; diff --git a/src/meta/service/src/api/grpc/grpc_service.rs b/src/meta/service/src/api/grpc/grpc_service.rs index 0692f25c0af5..19d6d7ceaf27 100644 --- a/src/meta/service/src/api/grpc/grpc_service.rs +++ b/src/meta/service/src/api/grpc/grpc_service.rs @@ -57,7 +57,6 @@ use futures::stream::TryChunksError; use futures::StreamExt; use futures::TryStreamExt; use log::debug; -use log::info; use prost::Message; use tokio_stream; use tokio_stream::Stream; @@ -109,7 +108,7 @@ impl MetaServiceImpl { #[fastrace::trace] async fn handle_kv_api(&self, request: Request) -> Result { let req: MetaGrpcReq = request.try_into()?; - info!("{}: Received MetaGrpcReq: {:?}", func_name!(), req); + debug!("{}: Received MetaGrpcReq: {:?}", func_name!(), req); let m = &self.meta_node; let reply = match &req { @@ -134,7 +133,7 @@ impl MetaServiceImpl { ) -> Result<(Option, BoxStream), Status> { let req: MetaGrpcReadReq = GrpcHelper::parse_req(request)?; - info!("{}: Received ReadRequest: {:?}", func_name!(), req); + debug!("{}: Received ReadRequest: {:?}", func_name!(), req); let req = ForwardRequest::new(1, req); @@ -156,7 +155,7 @@ impl MetaServiceImpl { ) -> Result<(Option, TxnReply), Status> { let txn = request.into_inner(); - info!("{}: Received TxnRequest: {}", func_name!(), txn); + debug!("{}: Received TxnRequest: {}", func_name!(), txn); let ent = LogEntry::new(Cmd::Transaction(txn.clone())); diff --git a/src/meta/service/src/configs/outer_v0.rs b/src/meta/service/src/configs/outer_v0.rs index 49ad99ca8af2..d97d309bd5df 100644 --- a/src/meta/service/src/configs/outer_v0.rs +++ b/src/meta/service/src/configs/outer_v0.rs @@ -305,8 +305,6 @@ pub struct ConfigViaEnv { pub kvsrv_single: bool, pub metasrv_join: Vec, pub kvsrv_id: u64, - pub sled_tree_prefix: String, - pub sled_max_cache_size_mb: u64, pub cluster_name: String, } @@ -363,8 +361,6 @@ impl From for ConfigViaEnv { kvsrv_single: cfg.raft_config.single, metasrv_join: cfg.raft_config.join, kvsrv_id: cfg.raft_config.id, - sled_tree_prefix: cfg.raft_config.sled_tree_prefix, - sled_max_cache_size_mb: cfg.raft_config.sled_max_cache_size_mb, cluster_name: cfg.raft_config.cluster_name, } } @@ -405,8 +401,6 @@ impl Into for ConfigViaEnv { // Do not allow to leave via environment variable leave_id: None, id: self.kvsrv_id, - sled_tree_prefix: self.sled_tree_prefix, - sled_max_cache_size_mb: self.sled_max_cache_size_mb, cluster_name: self.cluster_name, }; let log_config = LogConfig { @@ -539,7 +533,7 @@ pub struct RaftConfig { /// The total cache size for snapshot blocks. /// - /// By default it is 1GB. + /// By default, it is 1GB. #[clap(long, default_value = "1073741824")] pub snapshot_db_block_cache_size: u64, @@ -574,14 +568,6 @@ pub struct RaftConfig { #[clap(long, default_value = "0")] pub id: u64, - /// For test only: specifies the tree name prefix - #[clap(long, default_value = "")] - pub sled_tree_prefix: String, - - /// The maximum memory in MB that sled can use for caching. Default is 10GB - #[clap(long, default_value = "10240")] - pub sled_max_cache_size_mb: u64, - /// The node name. If the user specifies a name, the user-supplied name is used, /// if not, the default name is used #[clap(long, default_value = "foo_cluster")] @@ -630,8 +616,6 @@ impl From for InnerRaftConfig { leave_via: x.leave_via, leave_id: x.leave_id, id: x.id, - sled_tree_prefix: x.sled_tree_prefix, - sled_max_cache_size_mb: x.sled_max_cache_size_mb, cluster_name: x.cluster_name, wait_leader_timeout: x.wait_leader_timeout, } @@ -668,8 +652,6 @@ impl From for RaftConfig { leave_via: inner.leave_via, leave_id: inner.leave_id, id: inner.id, - sled_tree_prefix: inner.sled_tree_prefix, - sled_max_cache_size_mb: inner.sled_max_cache_size_mb, cluster_name: inner.cluster_name, wait_leader_timeout: inner.wait_leader_timeout, } diff --git a/src/meta/service/tests/it/configs.rs b/src/meta/service/tests/it/configs.rs index 4f4f2daba9a5..ad5e9f3cb9b1 100644 --- a/src/meta/service/tests/it/configs.rs +++ b/src/meta/service/tests/it/configs.rs @@ -89,7 +89,6 @@ cluster_name = "foo_cluster" assert!(!cfg.raft_config.single); assert_eq!(cfg.raft_config.join, vec!["j1", "j2"]); assert_eq!(cfg.raft_config.id, 20); - assert_eq!(cfg.raft_config.sled_tree_prefix, "sled_foo"); assert_eq!(cfg.raft_config.cluster_name, "foo_cluster"); }); diff --git a/src/meta/service/tests/it/grpc/metasrv_grpc_watch.rs b/src/meta/service/tests/it/grpc/metasrv_grpc_watch.rs index d7c02d9e68ad..cb295a737706 100644 --- a/src/meta/service/tests/it/grpc/metasrv_grpc_watch.rs +++ b/src/meta/service/tests/it/grpc/metasrv_grpc_watch.rs @@ -311,12 +311,14 @@ async fn test_watch_expired_events() -> anyhow::Result<()> { // - Before applying, 32 expired keys will be cleaned. // - When applying, touched expired keys will be cleaned. + fn sec(x: u64) -> Duration { + Duration::from_secs(x) + } + let (_tc, addr) = crate::tests::start_metasrv().await?; let watch_prefix = "w_"; let now_sec = now(); - let expire = now_sec + 11; - // dbg!(now_sec, expire); info!("--- prepare data that are gonna expire"); { @@ -332,31 +334,19 @@ async fn test_watch_expired_events() -> anyhow::Result<()> { for i in 0..(32 + 1) { let k = format!("w_auto_gc_{}", i); txn.if_then - .push(TxnOp::put_with_ttl(&k, b(&k), Some(Duration::from_secs(1)))); + .push(TxnOp::put_with_ttl(&k, b(&k), Some(sec(1)))); } // Expired key won't be cleaned when they are read, although read returns None. - txn.if_then.push(TxnOp::put_with_ttl( - "w_b1", - b("w_b1"), - Some(Duration::from_secs(6)), - )); - txn.if_then.push(TxnOp::put_with_ttl( - "w_b2", - b("w_b2"), - Some(Duration::from_secs(6)), - )); - txn.if_then.push(TxnOp::put_with_ttl( - "w_b3a", - b("w_b3a"), - Some(Duration::from_secs(6)), - )); - txn.if_then.push(TxnOp::put_with_ttl( - "w_b3b", - b("w_b3b"), - Some(Duration::from_secs(11)), - )); + txn.if_then + .push(TxnOp::put_with_ttl("w_b1", b("w_b1"), Some(sec(6)))); + txn.if_then + .push(TxnOp::put_with_ttl("w_b2", b("w_b2"), Some(sec(6)))); + txn.if_then + .push(TxnOp::put_with_ttl("w_b3a", b("w_b3a"), Some(sec(6)))); + txn.if_then + .push(TxnOp::put_with_ttl("w_b3b", b("w_b3b"), Some(sec(15)))); client.transaction(txn).await?; } @@ -373,8 +363,8 @@ async fn test_watch_expired_events() -> anyhow::Result<()> { watch_client.request(watch).await? }; - info!("--- sleep {} for expiration", expire - now_sec); - tokio::time::sleep(Duration::from_secs(10)).await; + info!("--- sleep 10 for expiration"); + tokio::time::sleep(sec(10)).await; info!("--- apply another txn in another thread to override keys"); { @@ -430,20 +420,20 @@ async fn test_watch_expired_events() -> anyhow::Result<()> { "w_b3b", seq + 3, "w_b3b", - Some(KvMeta::new_expire(now_sec + 16)), + Some(KvMeta::new_expire(now_sec + 15)), ), // expired ]; - // remove the millisecond part of expire_at + // The evaluated expire_at could not equal to the real expire_at, so we need to tidy the expire_at. fn tidy(mut ev: Event) -> Event { if let Some(ref mut prev) = ev.prev { if let Some(ref mut meta) = prev.meta { - meta.expire_at = meta.expire_at.map(|x| x / 1000 * 1000); + meta.expire_at = meta.expire_at.map(|x| x / 10 * 10); } } if let Some(ref mut current) = ev.current { if let Some(ref mut meta) = current.meta { - meta.expire_at = meta.expire_at.map(|x| x / 1000 * 1000); + meta.expire_at = meta.expire_at.map(|x| x / 10 * 10); } } ev diff --git a/src/meta/service/tests/it/meta_node/meta_node_lifecycle.rs b/src/meta/service/tests/it/meta_node/meta_node_lifecycle.rs index a2114f8d940b..a23a9b67449b 100644 --- a/src/meta/service/tests/it/meta_node/meta_node_lifecycle.rs +++ b/src/meta/service/tests/it/meta_node/meta_node_lifecycle.rs @@ -319,11 +319,11 @@ async fn test_meta_node_join_with_state() -> anyhow::Result<()> { tc2.config.raft_config.single = false; tc2.config.raft_config.join = vec![tc0.config.raft_config.raft_api_addr().await?.to_string()]; - let meta_node = MetaNode::start(&tc0.config).await?; - // Initial log, leader blank log, add node-0. + let n1 = MetaNode::start(&tc0.config).await?; + // Initial membership log, leader blank log, add node-0 log. let mut log_index = 3; - let res = meta_node + let res = n1 .join_cluster( &tc0.config.raft_config, tc0.config.grpc_api_advertise_address(), @@ -331,8 +331,8 @@ async fn test_meta_node_join_with_state() -> anyhow::Result<()> { .await?; assert_eq!(Err("Did not join: --join is empty".to_string()), res); - let meta_node1 = MetaNode::start(&tc1.config).await?; - let res = meta_node1 + let n1 = MetaNode::start(&tc1.config).await?; + let res = n1 .join_cluster( &tc1.config.raft_config, tc1.config.grpc_api_advertise_address(), @@ -342,8 +342,7 @@ async fn test_meta_node_join_with_state() -> anyhow::Result<()> { // Two membership logs, one add-node log; log_index += 3; - meta_node1 - .raft + n1.raft .wait(timeout()) .applied_index(Some(log_index), "node-1 join cluster") .await?; @@ -354,6 +353,9 @@ async fn test_meta_node_join_with_state() -> anyhow::Result<()> { n2.stop().await?; } + // Wait a second to ensure server quits completely. + sleep(Duration::from_secs(1)).await; + info!("--- Allow to join node-2 with initialized store"); { let n2 = MetaNode::start(&tc2.config).await?; @@ -368,8 +370,8 @@ async fn test_meta_node_join_with_state() -> anyhow::Result<()> { // Two membership logs, one add-node log; log_index += 3; - // Add this barrier to ensure all of the logs are applied before quit. - // Otherwise the next time node-2 starts it can not see the applied + // Add this barrier to ensure all the logs are applied before quit. + // Otherwise, the next time node-2 starts it can not see the applied // membership and believes it has not yet joined into a cluster. n2.raft .wait(timeout()) @@ -379,6 +381,9 @@ async fn test_meta_node_join_with_state() -> anyhow::Result<()> { n2.stop().await?; } + // Wait a second to ensure server quits completely. + sleep(Duration::from_secs(1)).await; + info!("--- Not allowed to join node-2 with store with membership"); { let n2 = MetaNode::start(&tc2.config).await?; diff --git a/src/meta/service/tests/it/tests/service.rs b/src/meta/service/tests/it/tests/service.rs index ee59f4846538..2fc5357340eb 100644 --- a/src/meta/service/tests/it/tests/service.rs +++ b/src/meta/service/tests/it/tests/service.rs @@ -160,9 +160,6 @@ impl MetaSrvTestContext { let host = "127.0.0.1"; - // We use a single sled db for all unit test. Every unit test need a unique prefix so that it opens different tree. - config.raft_config.sled_tree_prefix = format!("test-{}-", config_id); - { let grpc_port = next_port(); config.grpc_api_address = format!("{}:{}", host, grpc_port); From 8cbda70939c2f17654c6fab1dca3ccef3c78936f Mon Sep 17 00:00:00 2001 From: zhya Date: Thu, 28 Nov 2024 21:47:20 +0800 Subject: [PATCH 7/7] chore(storage): refine error message for stream read offset snapshot (#16964) * chore: refine error message for stream read offset snapshot * fix --- src/query/sql/src/planner/binder/binder.rs | 2 +- .../storages/fuse/src/operations/changes.rs | 25 +++++++++++++------ .../storages/fuse/src/operations/commit.rs | 16 +----------- .../common/processors/sink_commit.rs | 15 ++++++++--- src/query/storages/stream/src/stream_table.rs | 4 +-- .../storages/system/src/streams_table.rs | 13 ++++------ 6 files changed, 38 insertions(+), 37 deletions(-) diff --git a/src/query/sql/src/planner/binder/binder.rs b/src/query/sql/src/planner/binder/binder.rs index e21f85497de4..c5b7c4114aaf 100644 --- a/src/query/sql/src/planner/binder/binder.rs +++ b/src/query/sql/src/planner/binder/binder.rs @@ -643,7 +643,7 @@ impl<'a> Binder { }; match plan.kind() { - QueryKind::Query { .. } | QueryKind::Explain { .. } => {} + QueryKind::Query | QueryKind::Explain => {} _ => { let meta_data_guard = self.metadata.read(); let tables = meta_data_guard.tables(); diff --git a/src/query/storages/fuse/src/operations/changes.rs b/src/query/storages/fuse/src/operations/changes.rs index a114fcda3218..ed414cdb7f61 100644 --- a/src/query/storages/fuse/src/operations/changes.rs +++ b/src/query/storages/fuse/src/operations/changes.rs @@ -39,6 +39,7 @@ use databend_common_expression::BASE_BLOCK_IDS_COL_NAME; use databend_storages_common_table_meta::meta::BlockMeta; use databend_storages_common_table_meta::meta::Location; use databend_storages_common_table_meta::meta::SegmentInfo; +use databend_storages_common_table_meta::meta::TableSnapshot; use databend_storages_common_table_meta::table::ChangeType; use databend_storages_common_table_meta::table::StreamMode; use databend_storages_common_table_meta::table::OPT_KEY_CHANGE_TRACKING_BEGIN_VER; @@ -222,9 +223,8 @@ impl FuseTable { let latest_segments: HashSet<&Location> = HashSet::from_iter(&latest_snapshot.segments); - let (base_snapshot, _) = - SnapshotsIO::read_snapshot(base_location.clone(), self.get_operator()) - .await?; + let base_snapshot = + self.changes_read_offset_snapshot(base_location).await?; let base_segments = HashSet::from_iter(&base_snapshot.segments); // If the base segments are a subset of the latest segments, @@ -349,8 +349,7 @@ impl FuseTable { }; let base_segments = if let Some(snapshot) = base { - let (sn, _) = - SnapshotsIO::read_snapshot(snapshot.to_string(), self.get_operator()).await?; + let sn = self.changes_read_offset_snapshot(snapshot).await?; HashSet::from_iter(sn.segments.clone()) } else { HashSet::new() @@ -435,8 +434,7 @@ impl FuseTable { return self.table_statistics(ctx, true, None).await; }; - let (base_snapshot, _) = - SnapshotsIO::read_snapshot(base_location.clone(), self.get_operator()).await?; + let base_snapshot = self.changes_read_offset_snapshot(base_location).await?; let base_summary = base_snapshot.summary.clone(); let latest_summary = if let Some(snapshot) = self.read_table_snapshot().await? { snapshot.summary.clone() @@ -499,6 +497,19 @@ impl FuseTable { } } } + + pub async fn changes_read_offset_snapshot( + &self, + base_location: &String, + ) -> Result> { + match SnapshotsIO::read_snapshot(base_location.to_string(), self.get_operator()).await { + Ok((base_snapshot, _)) => Ok(base_snapshot), + Err(_) => Err(ErrorCode::IllegalStream(format!( + "Failed to read the offset snapshot: {:?}, maybe purged", + base_location + ))), + } + } } fn replace_push_downs( diff --git a/src/query/storages/fuse/src/operations/commit.rs b/src/query/storages/fuse/src/operations/commit.rs index 618366ad3f80..edc0a5c76124 100644 --- a/src/query/storages/fuse/src/operations/commit.rs +++ b/src/query/storages/fuse/src/operations/commit.rs @@ -480,22 +480,8 @@ impl FuseTable { } } - #[inline] - pub fn is_error_recoverable(e: &ErrorCode, is_table_transient: bool) -> bool { - let code = e.code(); - code == ErrorCode::TABLE_VERSION_MISMATCHED - || (is_table_transient && code == ErrorCode::STORAGE_NOT_FOUND) - } - - #[inline] - pub fn no_side_effects_in_meta_store(e: &ErrorCode) -> bool { - // currently, the only error that we know, which indicates there are no side effects - // is TABLE_VERSION_MISMATCHED - e.code() == ErrorCode::TABLE_VERSION_MISMATCHED - } - // check if there are any fuse table legacy options - pub fn remove_legacy_options(table_options: &mut BTreeMap) { + fn remove_legacy_options(table_options: &mut BTreeMap) { table_options.remove(OPT_KEY_LEGACY_SNAPSHOT_LOC); } } diff --git a/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs b/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs index dd1a23cad263..551d0aa2c1c3 100644 --- a/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs +++ b/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs @@ -138,12 +138,21 @@ where F: SnapshotGenerator + Send + 'static } fn is_error_recoverable(&self, e: &ErrorCode) -> bool { + let code = e.code(); // When prev_snapshot_id is some, means it is an alter table column modification or truncate. // In this case if commit to meta fail and error is TABLE_VERSION_MISMATCHED operation will be aborted. - if self.prev_snapshot_id.is_some() && e.code() == ErrorCode::TABLE_VERSION_MISMATCHED { + if self.prev_snapshot_id.is_some() && code == ErrorCode::TABLE_VERSION_MISMATCHED { return false; } - FuseTable::is_error_recoverable(e, self.purge) + + code == ErrorCode::TABLE_VERSION_MISMATCHED + || (self.purge && code == ErrorCode::STORAGE_NOT_FOUND) + } + + fn no_side_effects_in_meta_store(e: &ErrorCode) -> bool { + // currently, the only error that we know, which indicates there are no side effects + // is TABLE_VERSION_MISMATCHED + e.code() == ErrorCode::TABLE_VERSION_MISMATCHED } fn read_meta(&mut self) -> Result { @@ -469,7 +478,7 @@ where F: SnapshotGenerator + Send + 'static None => { // Commit not fulfilled. try to abort the operations. // if it is safe to do so. - if FuseTable::no_side_effects_in_meta_store(&e) { + if Self::no_side_effects_in_meta_store(&e) { // if we are sure that table state inside metastore has not been // modified by this operation, abort this operation. self.state = State::Abort(e); diff --git a/src/query/storages/stream/src/stream_table.rs b/src/query/storages/stream/src/stream_table.rs index b43a08db11f8..e3f0bbec4c8d 100644 --- a/src/query/storages/stream/src/stream_table.rs +++ b/src/query/storages/stream/src/stream_table.rs @@ -42,7 +42,6 @@ use databend_common_pipeline_core::Pipeline; use databend_common_sql::binder::STREAM_COLUMN_FACTORY; use databend_common_storages_fuse::io::MetaReaders; use databend_common_storages_fuse::io::SnapshotHistoryReader; -use databend_common_storages_fuse::io::SnapshotsIO; use databend_common_storages_fuse::io::TableMetaLocationGenerator; use databend_common_storages_fuse::FuseTable; use databend_storages_common_table_meta::table::ChangeType; @@ -173,8 +172,7 @@ impl StreamTable { fuse_table.check_changes_valid(source_desc, self.offset()?)?; let (base_row_count, base_timsestamp) = if let Some(base_loc) = self.snapshot_loc() { - let (base, _) = - SnapshotsIO::read_snapshot(base_loc.to_string(), fuse_table.get_operator()).await?; + let base = fuse_table.changes_read_offset_snapshot(&base_loc).await?; (base.summary.row_count, base.timestamp) } else { (0, None) diff --git a/src/query/storages/system/src/streams_table.rs b/src/query/storages/system/src/streams_table.rs index 614839112377..7ac2b487582b 100644 --- a/src/query/storages/system/src/streams_table.rs +++ b/src/query/storages/system/src/streams_table.rs @@ -39,7 +39,6 @@ use databend_common_meta_app::principal::OwnershipObject; use databend_common_meta_app::schema::TableIdent; use databend_common_meta_app::schema::TableInfo; use databend_common_meta_app::schema::TableMeta; -use databend_common_storages_fuse::io::SnapshotsIO; use databend_common_storages_fuse::operations::acquire_task_permit; use databend_common_storages_fuse::FuseTable; use databend_common_storages_stream::stream_table::StreamTable; @@ -250,13 +249,11 @@ impl AsyncSystemTable for StreamsTable { let fuse_table = FuseTable::try_from_table(source.as_ref()).unwrap(); if let Some(location) = stream_table.snapshot_loc() { - reason = SnapshotsIO::read_snapshot( - location, - fuse_table.get_operator(), - ) - .await - .err() - .map_or("".to_string(), |e| e.display_text()); + reason = fuse_table + .changes_read_offset_snapshot(&location) + .await + .err() + .map_or("".to_string(), |e| e.display_text()); } } Err(e) => {