Skip to content

Commit

Permalink
[IR Runtime] optimize the case of label filtering in Auxilia
Browse files Browse the repository at this point in the history
Committed-by: bingqing.lbq from Dev container
  • Loading branch information
BingqingLyu committed Nov 16, 2023
1 parent 75fb73b commit eefda74
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,4 +174,12 @@ impl QueryParams {
None
}
}

// is_queryable doesn't consider tables as we assume that the table info can be inferred directly from current data.
pub fn is_queryable(&self) -> bool {
!(self.filter.is_none()
&& self.limit.is_none()
&& self.sample_ratio.is_none()
&& self.columns.is_none())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ impl TryFrom<pb::index_predicate::AndPredicate> for Predicates {
}

predicates.ok_or_else(|| {
(ParsePbError::ParseError("invalid `AndPredicate` in `IndexPredicate`".to_string()))
ParsePbError::ParseError("invalid `AndPredicate` in `IndexPredicate`".to_string())
})
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,79 +159,86 @@ impl FilterMapFunction<Record, Record> for AuxiliaOperator {
// e.g., for g.V().out().as("a").has("name", "marko"), we should compile as:
// g.V().out().auxilia(as("a"))... where we give alias in auxilia,
// then we set tag=None and alias="a" in auxilia
// 1. filter by labels.
if !self.query_params.labels.is_empty() && entry.label().is_some() {
if !self

// 1. If only need to filter by labels, and the entry itself carries label information already, directly eval it without query the store
if !self.query_params.labels.is_empty()
&& !self.query_params.is_queryable()
&& entry.label().is_some()
{
if self
.query_params
.labels
.contains(&entry.label().unwrap())
{
return Ok(None);
Ok(Some(input))
} else {
Ok(None)
}
}
// 2. further fetch properties, e.g., filter by columns.
match entry.get_type() {
EntryType::Vertex => {
let graph = get_graph().ok_or_else(|| FnExecError::NullGraphError)?;
let id = entry.id();
if let Some(vertex) = graph
.get_vertex(&[id], &self.query_params)?
.next()
.map(|vertex| DynEntry::new(vertex))
{
if let Some(alias) = self.alias {
// append without moving head
input
.get_columns_mut()
.insert(alias as usize, vertex.into());
} else {
// 2. Otherwise, filter after query store, e.g., the case of filter by columns.
match entry.get_type() {
EntryType::Vertex => {
let graph = get_graph().ok_or_else(|| FnExecError::NullGraphError)?;
let id = entry.id();
if let Some(vertex) = graph
.get_vertex(&[id], &self.query_params)?
.next()
.map(|vertex| DynEntry::new(vertex))
{
if let Some(alias) = self.alias {
// append without moving head
input
.get_columns_mut()
.insert(alias as usize, vertex.into());
} else {
input.append(vertex, self.alias.clone());
}
} else {
input.append(vertex, self.alias.clone());
return Ok(None);
}
} else {
return Ok(None);
}
}
EntryType::Edge => {
// TODO: This is a little bit tricky. Modify this logic to query store with eid when supported.
// Currently, when getting properties from an edge,
// we assume that it has already been carried in the edge (when the first time queried the edge)
// since on most storages, query edges by eid is not supported yet.
if self.tag.eq(&self.alias) {
// do nothing as we assume properties is already carried
} else {
let entry = entry.clone();
if let Some(alias) = self.alias {
// append without moving head
input
.get_columns_mut()
.insert(alias as usize, entry);
EntryType::Edge => {
// TODO: This is a little bit tricky. Modify this logic to query store with eid when supported.
// Currently, when getting properties from an edge,
// we assume that it has already been carried in the edge (when the first time queried the edge)
// since on most storages, query edges by eid is not supported yet.
if self.tag.eq(&self.alias) {
// do nothing as we assume properties is already carried
} else {
input.append_arc_entry(entry, self.alias.clone());
let entry = entry.clone();
if let Some(alias) = self.alias {
// append without moving head
input
.get_columns_mut()
.insert(alias as usize, entry);
} else {
input.append_arc_entry(entry, self.alias.clone());
}
}
}
}
EntryType::Path => {
// Auxilia for vertices in Path is for filtering.
let graph_path = entry
.as_graph_path()
.ok_or_else(|| FnExecError::Unreachable)?;
let path_end = graph_path.get_path_end();
let graph = get_graph().ok_or_else(|| FnExecError::NullGraphError)?;
let id = path_end.id();
if graph
.get_vertex(&[id], &self.query_params)?
.next()
.is_none()
{
return Ok(None);
EntryType::Path => {
// Auxilia for vertices in Path is for filtering.
let graph_path = entry
.as_graph_path()
.ok_or_else(|| FnExecError::Unreachable)?;
let path_end = graph_path.get_path_end();
let graph = get_graph().ok_or_else(|| FnExecError::NullGraphError)?;
let id = path_end.id();
if graph
.get_vertex(&[id], &self.query_params)?
.next()
.is_none()
{
return Ok(None);
}
}
}
_ => Err(FnExecError::unexpected_data_error(&format!(
_ => Err(FnExecError::unexpected_data_error(&format!(
"neither Vertex nor Edge entry is accessed in `Auxilia` operator, the entry is {:?}",
entry
)))?,
};
Ok(Some(input))
};
Ok(Some(input))
}
} else {
Ok(None)
}
Expand Down

0 comments on commit eefda74

Please sign in to comment.