diff --git a/interactive_engine/executor/ir/common/src/utils.rs b/interactive_engine/executor/ir/common/src/utils.rs index d44d98df1a86..f70dea498c9b 100644 --- a/interactive_engine/executor/ir/common/src/utils.rs +++ b/interactive_engine/executor/ir/common/src/utils.rs @@ -425,40 +425,40 @@ impl TryFrom for Vec { } } -impl TryFrom for Vec<(NameOrId, Object)> { +impl TryFrom for Vec> { type Error = ParsePbError; fn try_from(value: pb::IndexPredicate) -> Result { - let mut primary_key_values = vec![]; - // for pk values, which should be a set of and_conditions. - let and_predicates = value - .or_predicates - .get(0) - .ok_or(ParsePbError::EmptyFieldError("`OrCondition` is emtpy".to_string()))?; - for predicate in &and_predicates.predicates { - let key_pb = predicate - .key - .clone() - .ok_or("key is empty in kv_pair in indexed_scan")?; - let value_pb = predicate - .value - .clone() - .ok_or("value is empty in kv_pair in indexed_scan")?; - let key = match key_pb.item { - Some(common_pb::property::Item::Key(prop_key)) => prop_key.try_into()?, - _ => Err(ParsePbError::Unsupported( - "Other keys rather than property key in kv_pair in indexed_scan".to_string(), - ))?, - }; - if let pb::index_predicate::triplet::Value::Const(value) = value_pb { - let obj_val = Object::try_from(value)?; - primary_key_values.push((key, obj_val)); - } else { - Err(ParsePbError::Unsupported(format!( - "unsupported indexed predicate value {:?}", - value_pb - )))? + let mut primary_key_values = Vec::with_capacity(value.or_predicates.len()); + for and_predicates in value.or_predicates { + // PkValue can be one-column or multi-columns, which is a set of and_conditions. + let mut primary_key_value = Vec::with_capacity(and_predicates.predicates.len()); + for predicate in &and_predicates.predicates { + let key_pb = predicate + .key + .clone() + .ok_or("key is empty in kv_pair in indexed_scan")?; + let value_pb = predicate + .value + .clone() + .ok_or("value is empty in kv_pair in indexed_scan")?; + let key = match key_pb.item { + Some(common_pb::property::Item::Key(prop_key)) => prop_key.try_into()?, + _ => Err(ParsePbError::Unsupported( + "Other keys rather than property key in kv_pair in indexed_scan".to_string(), + ))?, + }; + if let pb::index_predicate::triplet::Value::Const(value) = value_pb { + let obj_val = Object::try_from(value)?; + primary_key_value.push((key, obj_val)); + } else { + Err(ParsePbError::Unsupported(format!( + "unsupported indexed predicate value {:?}", + value_pb + )))? + } } + primary_key_values.push(primary_key_value); } Ok(primary_key_values) } diff --git a/interactive_engine/executor/ir/core/src/plan/logical.rs b/interactive_engine/executor/ir/core/src/plan/logical.rs index 4739d7244ae1..4c57001e7095 100644 --- a/interactive_engine/executor/ir/core/src/plan/logical.rs +++ b/interactive_engine/executor/ir/core/src/plan/logical.rs @@ -847,6 +847,7 @@ fn triplet_to_index_predicate( let schema = meta.schema.as_ref().unwrap(); let mut key = None; let mut is_eq = false; + let mut is_within = false; if let Some(item) = &operators.get(0).unwrap().item { match item { common_pb::expr_opr::Item::Var(var) => { @@ -886,30 +887,57 @@ fn triplet_to_index_predicate( if *l == 0 { // Eq is_eq = true; + } else if *l == 6 { + // Within + is_within = true; } } _ => { /*do nothing*/ } } }; - if !is_eq { + if !is_eq && !is_within { return Ok(None); } if let Some(item) = &operators.get(2).unwrap().item { match item { common_pb::expr_opr::Item::Const(c) => { - let idx_pred = pb::IndexPredicate { - or_predicates: vec![pb::index_predicate::AndPredicate { - predicates: vec![pb::index_predicate::Triplet { - key, - value: Some(c.clone().into()), - cmp: None, - }], - }], - }; - return Ok(Some(idx_pred)); + if is_within { + let or_predicates = match c.item.clone().unwrap() { + common_pb::value::Item::I32Array(array) => array + .item + .into_iter() + .map(|val| build_and_predicate(key.clone(), val.into())) + .collect(), + common_pb::value::Item::I64Array(array) => array + .item + .into_iter() + .map(|val| build_and_predicate(key.clone(), val.into())) + .collect(), + common_pb::value::Item::F64Array(array) => array + .item + .into_iter() + .map(|val| build_and_predicate(key.clone(), val.into())) + .collect(), + common_pb::value::Item::StrArray(array) => array + .item + .into_iter() + .map(|val| build_and_predicate(key.clone(), val.into())) + .collect(), + _ => Err(IrError::Unsupported(format!( + "unsupported value type for within: {:?}", + c + )))?, + }; + return Ok(Some(pb::IndexPredicate { or_predicates })); + } else { + let idx_pred = + pb::IndexPredicate { or_predicates: vec![build_and_predicate(key, c.clone())] }; + return Ok(Some(idx_pred)); + } } + common_pb::expr_opr::Item::Param(param) => { let idx_pred = pb::IndexPredicate { or_predicates: vec![pb::index_predicate::AndPredicate { @@ -930,6 +958,14 @@ fn triplet_to_index_predicate( Ok(None) } +fn build_and_predicate( + key: Option, value: common_pb::Value, +) -> pb::index_predicate::AndPredicate { + pb::index_predicate::AndPredicate { + predicates: vec![pb::index_predicate::Triplet { key, value: Some(value.into()), cmp: None }], + } +} + fn get_table_id_from_pb(schema: &Schema, name: &common_pb::NameOrId) -> Option { name.item.as_ref().and_then(|item| match item { common_pb::name_or_id::Item::Name(name) => schema.get_table_id(name), @@ -2294,6 +2330,62 @@ mod test { ); } + #[test] + fn scan_pred_to_idx_pred_with_within() { + let mut plan_meta = PlanMeta::default(); + plan_meta.set_curr_node(0); + plan_meta.curr_node_meta_mut(); + plan_meta.refer_to_nodes(0, vec![0]); + let meta = StoreMeta { + schema: Some( + Schema::from_json(std::fs::File::open("resource/modern_schema_pk.json").unwrap()).unwrap(), + ), + }; + let mut scan = pb::Scan { + scan_opt: 0, + alias: None, + params: Some(pb::QueryParams { + tables: vec!["person".into()], + columns: vec![], + is_all_columns: false, + limit: None, + predicate: Some(str_to_expr_pb("@.name within [\"John\", \"Josh\"]".to_string()).unwrap()), + sample_ratio: 1.0, + extra: HashMap::new(), + }), + idx_predicate: None, + is_count_only: false, + meta_data: None, + }; + + scan.preprocess(&meta, &mut plan_meta).unwrap(); + assert!(scan.params.unwrap().predicate.is_none()); + assert_eq!( + scan.idx_predicate.unwrap(), + pb::IndexPredicate { + or_predicates: vec![ + pb::index_predicate::AndPredicate { + predicates: vec![pb::index_predicate::Triplet { + key: Some(common_pb::Property { + item: Some(common_pb::property::Item::Key("name".into())), + }), + value: Some("John".to_string().into()), + cmp: None, + }] + }, + pb::index_predicate::AndPredicate { + predicates: vec![pb::index_predicate::Triplet { + key: Some(common_pb::Property { + item: Some(common_pb::property::Item::Key("name".into())), + }), + value: Some("Josh".to_string().into()), + cmp: None, + }] + } + ] + } + ); + } #[test] fn column_maintain_case1() { let mut plan = LogicalPlan::with_root(); diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/source.rs b/interactive_engine/executor/ir/runtime/src/process/operator/source.rs index 76f9179ca349..1304e6ccf192 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/source.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/source.rs @@ -43,7 +43,7 @@ pub enum SourceType { pub struct SourceOperator { query_params: QueryParams, src: Option>>, - primary_key_values: Option, + primary_key_values: Option>, alias: Option, source_type: SourceType, is_count_only: bool, @@ -83,8 +83,12 @@ impl SourceOperator { debug!("Runtime source op of indexed scan of global ids {:?}", source_op); } else { // query by indexed_scan - let primary_key_values = >::try_from(ip2)?; - source_op.primary_key_values = Some(PKV::from(primary_key_values)); + let primary_key_values = >>::try_from(ip2)?; + let pkvs = primary_key_values + .into_iter() + .map(|pkv| PKV::from(pkv)) + .collect(); + source_op.primary_key_values = Some(pkvs); debug!("Runtime source op of indexed scan {:?}", source_op); } Ok(source_op) @@ -135,13 +139,13 @@ impl SourceOperator { } else { let mut v_source = Box::new(std::iter::empty()) as Box + Send>; - if let Some(ref seeds) = self.src { + if let Some(seeds) = &self.src { if let Some(src) = seeds.get(&(worker_index as u64)) { if !src.is_empty() { v_source = graph.get_vertex(src, &self.query_params)?; } } - } else if let Some(ref indexed_values) = self.primary_key_values { + } else if let Some(pkvs) = &self.primary_key_values { if self.query_params.labels.is_empty() { Err(FnGenError::unsupported_error( "Empty label in `IndexScan` self.query_params.labels", @@ -149,10 +153,10 @@ impl SourceOperator { } let mut source_vertices = vec![]; for label in &self.query_params.labels { - if let Some(v) = - graph.index_scan_vertex(*label, indexed_values, &self.query_params)? - { - source_vertices.push(v); + for pkv in pkvs { + if let Some(v) = graph.index_scan_vertex(*label, pkv, &self.query_params)? { + source_vertices.push(v); + } } } v_source = Box::new(source_vertices.into_iter());