Skip to content

Commit

Permalink
fix(interactive): Support Index predicate with multiple expected valu…
Browse files Browse the repository at this point in the history
…es given via P.within() (#3300)

<!--
Thanks for your contribution! please review
https://github.com/alibaba/GraphScope/blob/main/CONTRIBUTING.md before
opening an issue.
-->

## What do these changes do?

<!-- Please give a short brief about these changes. -->

As titled.

## Related issue number

<!-- Are there any issues opened that will be resolved by merging this
change? -->

Fixes #3299
  • Loading branch information
BingqingLyu authored Oct 19, 2023
1 parent f813f9b commit d065429
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 50 deletions.
60 changes: 30 additions & 30 deletions interactive_engine/executor/ir/common/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,40 +425,40 @@ impl TryFrom<pb::IndexPredicate> for Vec<i64> {
}
}

impl TryFrom<pb::IndexPredicate> for Vec<(NameOrId, Object)> {
impl TryFrom<pb::IndexPredicate> for Vec<Vec<(NameOrId, Object)>> {
type Error = ParsePbError;

fn try_from(value: pb::IndexPredicate) -> Result<Self, Self::Error> {
let mut primary_key_values = vec![];
// for pk values, which should be a set of and_conditions.
let and_predicates = value
.or_predicates
.get(0)
.ok_or(ParsePbError::EmptyFieldError("`OrCondition` is emtpy".to_string()))?;
for predicate in &and_predicates.predicates {
let key_pb = predicate
.key
.clone()
.ok_or("key is empty in kv_pair in indexed_scan")?;
let value_pb = predicate
.value
.clone()
.ok_or("value is empty in kv_pair in indexed_scan")?;
let key = match key_pb.item {
Some(common_pb::property::Item::Key(prop_key)) => prop_key.try_into()?,
_ => Err(ParsePbError::Unsupported(
"Other keys rather than property key in kv_pair in indexed_scan".to_string(),
))?,
};
if let pb::index_predicate::triplet::Value::Const(value) = value_pb {
let obj_val = Object::try_from(value)?;
primary_key_values.push((key, obj_val));
} else {
Err(ParsePbError::Unsupported(format!(
"unsupported indexed predicate value {:?}",
value_pb
)))?
let mut primary_key_values = Vec::with_capacity(value.or_predicates.len());
for and_predicates in value.or_predicates {
// PkValue can be one-column or multi-columns, which is a set of and_conditions.
let mut primary_key_value = Vec::with_capacity(and_predicates.predicates.len());
for predicate in &and_predicates.predicates {
let key_pb = predicate
.key
.clone()
.ok_or("key is empty in kv_pair in indexed_scan")?;
let value_pb = predicate
.value
.clone()
.ok_or("value is empty in kv_pair in indexed_scan")?;
let key = match key_pb.item {
Some(common_pb::property::Item::Key(prop_key)) => prop_key.try_into()?,
_ => Err(ParsePbError::Unsupported(
"Other keys rather than property key in kv_pair in indexed_scan".to_string(),
))?,
};
if let pb::index_predicate::triplet::Value::Const(value) = value_pb {
let obj_val = Object::try_from(value)?;
primary_key_value.push((key, obj_val));
} else {
Err(ParsePbError::Unsupported(format!(
"unsupported indexed predicate value {:?}",
value_pb
)))?
}
}
primary_key_values.push(primary_key_value);
}
Ok(primary_key_values)
}
Expand Down
114 changes: 103 additions & 11 deletions interactive_engine/executor/ir/core/src/plan/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -847,6 +847,7 @@ fn triplet_to_index_predicate(
let schema = meta.schema.as_ref().unwrap();
let mut key = None;
let mut is_eq = false;
let mut is_within = false;
if let Some(item) = &operators.get(0).unwrap().item {
match item {
common_pb::expr_opr::Item::Var(var) => {
Expand Down Expand Up @@ -886,30 +887,57 @@ fn triplet_to_index_predicate(
if *l == 0 {
// Eq
is_eq = true;
} else if *l == 6 {
// Within
is_within = true;
}
}
_ => { /*do nothing*/ }
}
};

if !is_eq {
if !is_eq && !is_within {
return Ok(None);
}

if let Some(item) = &operators.get(2).unwrap().item {
match item {
common_pb::expr_opr::Item::Const(c) => {
let idx_pred = pb::IndexPredicate {
or_predicates: vec![pb::index_predicate::AndPredicate {
predicates: vec![pb::index_predicate::Triplet {
key,
value: Some(c.clone().into()),
cmp: None,
}],
}],
};
return Ok(Some(idx_pred));
if is_within {
let or_predicates = match c.item.clone().unwrap() {
common_pb::value::Item::I32Array(array) => array
.item
.into_iter()
.map(|val| build_and_predicate(key.clone(), val.into()))
.collect(),
common_pb::value::Item::I64Array(array) => array
.item
.into_iter()
.map(|val| build_and_predicate(key.clone(), val.into()))
.collect(),
common_pb::value::Item::F64Array(array) => array
.item
.into_iter()
.map(|val| build_and_predicate(key.clone(), val.into()))
.collect(),
common_pb::value::Item::StrArray(array) => array
.item
.into_iter()
.map(|val| build_and_predicate(key.clone(), val.into()))
.collect(),
_ => Err(IrError::Unsupported(format!(
"unsupported value type for within: {:?}",
c
)))?,
};
return Ok(Some(pb::IndexPredicate { or_predicates }));
} else {
let idx_pred =
pb::IndexPredicate { or_predicates: vec![build_and_predicate(key, c.clone())] };
return Ok(Some(idx_pred));
}
}

common_pb::expr_opr::Item::Param(param) => {
let idx_pred = pb::IndexPredicate {
or_predicates: vec![pb::index_predicate::AndPredicate {
Expand All @@ -930,6 +958,14 @@ fn triplet_to_index_predicate(
Ok(None)
}

fn build_and_predicate(
key: Option<common_pb::Property>, value: common_pb::Value,
) -> pb::index_predicate::AndPredicate {
pb::index_predicate::AndPredicate {
predicates: vec![pb::index_predicate::Triplet { key, value: Some(value.into()), cmp: None }],
}
}

fn get_table_id_from_pb(schema: &Schema, name: &common_pb::NameOrId) -> Option<KeyId> {
name.item.as_ref().and_then(|item| match item {
common_pb::name_or_id::Item::Name(name) => schema.get_table_id(name),
Expand Down Expand Up @@ -2294,6 +2330,62 @@ mod test {
);
}

#[test]
fn scan_pred_to_idx_pred_with_within() {
let mut plan_meta = PlanMeta::default();
plan_meta.set_curr_node(0);
plan_meta.curr_node_meta_mut();
plan_meta.refer_to_nodes(0, vec![0]);
let meta = StoreMeta {
schema: Some(
Schema::from_json(std::fs::File::open("resource/modern_schema_pk.json").unwrap()).unwrap(),
),
};
let mut scan = pb::Scan {
scan_opt: 0,
alias: None,
params: Some(pb::QueryParams {
tables: vec!["person".into()],
columns: vec![],
is_all_columns: false,
limit: None,
predicate: Some(str_to_expr_pb("@.name within [\"John\", \"Josh\"]".to_string()).unwrap()),
sample_ratio: 1.0,
extra: HashMap::new(),
}),
idx_predicate: None,
is_count_only: false,
meta_data: None,
};

scan.preprocess(&meta, &mut plan_meta).unwrap();
assert!(scan.params.unwrap().predicate.is_none());
assert_eq!(
scan.idx_predicate.unwrap(),
pb::IndexPredicate {
or_predicates: vec![
pb::index_predicate::AndPredicate {
predicates: vec![pb::index_predicate::Triplet {
key: Some(common_pb::Property {
item: Some(common_pb::property::Item::Key("name".into())),
}),
value: Some("John".to_string().into()),
cmp: None,
}]
},
pb::index_predicate::AndPredicate {
predicates: vec![pb::index_predicate::Triplet {
key: Some(common_pb::Property {
item: Some(common_pb::property::Item::Key("name".into())),
}),
value: Some("Josh".to_string().into()),
cmp: None,
}]
}
]
}
);
}
#[test]
fn column_maintain_case1() {
let mut plan = LogicalPlan::with_root();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub enum SourceType {
pub struct SourceOperator {
query_params: QueryParams,
src: Option<HashMap<u64, Vec<ID>>>,
primary_key_values: Option<PKV>,
primary_key_values: Option<Vec<PKV>>,
alias: Option<KeyId>,
source_type: SourceType,
is_count_only: bool,
Expand Down Expand Up @@ -83,8 +83,12 @@ impl SourceOperator {
debug!("Runtime source op of indexed scan of global ids {:?}", source_op);
} else {
// query by indexed_scan
let primary_key_values = <Vec<(NameOrId, Object)>>::try_from(ip2)?;
source_op.primary_key_values = Some(PKV::from(primary_key_values));
let primary_key_values = <Vec<Vec<(NameOrId, Object)>>>::try_from(ip2)?;
let pkvs = primary_key_values
.into_iter()
.map(|pkv| PKV::from(pkv))
.collect();
source_op.primary_key_values = Some(pkvs);
debug!("Runtime source op of indexed scan {:?}", source_op);
}
Ok(source_op)
Expand Down Expand Up @@ -135,24 +139,24 @@ impl SourceOperator {
} else {
let mut v_source =
Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Vertex> + Send>;
if let Some(ref seeds) = self.src {
if let Some(seeds) = &self.src {
if let Some(src) = seeds.get(&(worker_index as u64)) {
if !src.is_empty() {
v_source = graph.get_vertex(src, &self.query_params)?;
}
}
} else if let Some(ref indexed_values) = self.primary_key_values {
} else if let Some(pkvs) = &self.primary_key_values {
if self.query_params.labels.is_empty() {
Err(FnGenError::unsupported_error(
"Empty label in `IndexScan` self.query_params.labels",
))?
}
let mut source_vertices = vec![];
for label in &self.query_params.labels {
if let Some(v) =
graph.index_scan_vertex(*label, indexed_values, &self.query_params)?
{
source_vertices.push(v);
for pkv in pkvs {
if let Some(v) = graph.index_scan_vertex(*label, pkv, &self.query_params)? {
source_vertices.push(v);
}
}
}
v_source = Box::new(source_vertices.into_iter());
Expand Down

0 comments on commit d065429

Please sign in to comment.