Skip to content

Commit

Permalink
refine the impl
Browse files Browse the repository at this point in the history
  • Loading branch information
BingqingLyu committed Sep 6, 2024
1 parent fe73696 commit 7771445
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,92 +34,12 @@ pub struct EdgeExpandOperator<E: Entry> {
alias: Option<KeyId>,
stmt: Box<dyn Statement<ID, E>>,
expand_opt: ExpandOpt,
is_optional: bool,
}

impl<E: Entry + 'static> FlatMapFunction<Record, Record> for EdgeExpandOperator<E> {
type Target = DynIter<Record>;

fn exec(&self, mut input: Record) -> FnResult<Self::Target> {
if let Some(entry) = input.get(self.start_v_tag) {
match entry.get_type() {
EntryType::Vertex => {
let id = entry.id();
let iter = self.stmt.exec(id)?;
match self.expand_opt {
// the case of expand edge, and get end vertex;
ExpandOpt::Vertex => {
let neighbors_iter = iter.map(|e| {
if let Some(e) = e.as_edge() {
Vertex::new(
e.get_other_id(),
e.get_other_label().cloned(),
DynDetails::default(),
)
} else {
unreachable!()
}
});
Ok(Box::new(RecordExpandIter::new(
input,
self.alias.as_ref(),
Box::new(neighbors_iter),
)))
}
// the case of expand neighbors, including edges/vertices
ExpandOpt::Edge => {
Ok(Box::new(RecordExpandIter::new(input, self.alias.as_ref(), iter)))
}
// the case of get degree. TODO: this case should be a `Map`
ExpandOpt::Degree => {
let degree = iter.count();
input.append(object!(degree), self.alias);
Ok(Box::new(vec![input].into_iter()))
}
}
}
EntryType::Path => {
let graph_path = entry
.as_graph_path()
.ok_or_else(|| FnExecError::Unreachable)?;
let iter = self.stmt.exec(graph_path.get_path_end().id())?;
let curr_path = graph_path.clone();
Ok(Box::new(RecordPathExpandIter::new(input, curr_path, iter)))
}
EntryType::Object => {
let obj = entry
.as_object()
.ok_or_else(|| FnExecError::Unreachable)?;
if Object::None.eq(obj) {
input.append(Object::None, self.alias);
Ok(Box::new(vec![input].into_iter()))
} else {
Err(FnExecError::unexpected_data_error(&format!(
"Cannot Expand from current entry {:?}",
entry
)))?
}
}
_ => Err(FnExecError::unexpected_data_error(&format!(
"Cannot Expand from current entry {:?}",
entry
)))?,
}
} else {
Ok(Box::new(vec![].into_iter()))
}
}
}

pub struct OptionalEdgeExpandOperator<E: Entry> {
start_v_tag: Option<KeyId>,
alias: Option<KeyId>,
stmt: Box<dyn Statement<ID, E>>,
expand_opt: ExpandOpt,
}

impl<E: Entry + 'static> FlatMapFunction<Record, Record> for OptionalEdgeExpandOperator<E> {
type Target = DynIter<Record>;

fn exec(&self, mut input: Record) -> FnResult<Self::Target> {
if let Some(entry) = input.get(self.start_v_tag) {
match entry.get_type() {
Expand All @@ -129,7 +49,7 @@ impl<E: Entry + 'static> FlatMapFunction<Record, Record> for OptionalEdgeExpandO
match self.expand_opt {
// the case of expand edge, and get end vertex;
ExpandOpt::Vertex => {
if iter.peek().is_none() {
if self.is_optional && iter.peek().is_none() {
input.append(Object::None, self.alias);
Ok(Box::new(vec![input].into_iter()))
} else {
Expand All @@ -153,7 +73,7 @@ impl<E: Entry + 'static> FlatMapFunction<Record, Record> for OptionalEdgeExpandO
}
// the case of expand neighbors, including edges/vertices
ExpandOpt::Edge => {
if iter.peek().is_none() {
if self.is_optional && iter.peek().is_none() {
input.append(Object::None, self.alias);
Ok(Box::new(vec![input].into_iter()))
} else {
Expand All @@ -164,17 +84,28 @@ impl<E: Entry + 'static> FlatMapFunction<Record, Record> for OptionalEdgeExpandO
)))
}
}
// the case of get degree. TODO: this case should be a `Map`
// the case of get degree.
ExpandOpt::Degree => {
let degree = iter.count();
input.append(object!(degree), self.alias);
Ok(Box::new(vec![input].into_iter()))
}
}
}
EntryType::Path => Err(FnExecError::unsupported_error(
"Have not supported Optional Edge Expand in Path entry yet",
))?,
EntryType::Path => {
if self.is_optional {
Err(FnExecError::unsupported_error(
"Have not supported Optional Edge Expand in Path entry yet",
))?
} else {
let graph_path = entry
.as_graph_path()
.ok_or_else(|| FnExecError::Unreachable)?;
let iter = self.stmt.exec(graph_path.get_path_end().id())?;
let curr_path = graph_path.clone();
Ok(Box::new(RecordPathExpandIter::new(input, curr_path, iter)))
}
}
EntryType::Object => {
let obj = entry
.as_object()
Expand Down Expand Up @@ -224,62 +155,39 @@ impl FlatMapFuncGen for pb::EdgeExpand {
// Expand vertices with filters on edges.
// This can be regarded as a combination of EdgeExpand (with is_edge = true) + GetV
let stmt = graph.prepare_explore_edge(direction, &query_params)?;
if self.is_optional {
let edge_expand_operator = OptionalEdgeExpandOperator {
start_v_tag,
alias: edge_or_end_v_tag,
stmt,
expand_opt: ExpandOpt::Vertex,
};
Ok(Box::new(edge_expand_operator))
} else {
let edge_expand_operator = EdgeExpandOperator {
start_v_tag,
alias: edge_or_end_v_tag,
stmt,
expand_opt: ExpandOpt::Vertex,
};
Ok(Box::new(edge_expand_operator))
}
let edge_expand_operator = EdgeExpandOperator {
start_v_tag,
alias: edge_or_end_v_tag,
stmt,
expand_opt: ExpandOpt::Vertex,
is_optional: self.is_optional,
};
Ok(Box::new(edge_expand_operator))
} else {
// Expand vertices without any filters
let stmt = graph.prepare_explore_vertex(direction, &query_params)?;
if self.is_optional {
let edge_expand_operator = OptionalEdgeExpandOperator {
start_v_tag,
alias: edge_or_end_v_tag,
stmt,
expand_opt: ExpandOpt::Edge,
};
Ok(Box::new(edge_expand_operator))
} else {
let edge_expand_operator = EdgeExpandOperator {
start_v_tag,
alias: edge_or_end_v_tag,
stmt,
expand_opt: ExpandOpt::Edge,
};
Ok(Box::new(edge_expand_operator))
}
}
}
_ => {
// Expand edges or degree
let stmt = graph.prepare_explore_edge(direction, &query_params)?;
if self.is_optional {
let edge_expand_operator = OptionalEdgeExpandOperator {
let edge_expand_operator = EdgeExpandOperator {
start_v_tag,
alias: edge_or_end_v_tag,
stmt,
expand_opt,
expand_opt: ExpandOpt::Edge,
is_optional: self.is_optional,
};
Ok(Box::new(edge_expand_operator))
} else {
let edge_expand_operator =
EdgeExpandOperator { start_v_tag, alias: edge_or_end_v_tag, stmt, expand_opt };
Ok(Box::new(edge_expand_operator))
}
}
_ => {
// Expand edges or degree
let stmt = graph.prepare_explore_edge(direction, &query_params)?;
let edge_expand_operator = EdgeExpandOperator {
start_v_tag,
alias: edge_or_end_v_tag,
stmt,
expand_opt,
is_optional: self.is_optional,
};
Ok(Box::new(edge_expand_operator))
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,8 @@ impl FilterMapFunction<Record, Record> for GetVertexOperator {
}
} else if let Some(obj) = entry.as_object() {
if Object::None.eq(obj) {
if self.query_labels.is_empty() {
input.append(Object::None, self.alias);
return Ok(Some(input));
} else {
return Ok(None);
}
input.append(Object::None, self.alias);
Ok(Some(input))
} else {
Err(FnExecError::unexpected_data_error(&format!(
"Can only apply `GetV` on an object that is not None. The entry is {:?}",
Expand Down Expand Up @@ -258,7 +254,6 @@ impl FilterMapFunction<Record, Record> for AuxiliaOperator {
} else if let Some(obj) = entry.as_object() {
if Object::None.eq(obj) {
if let Some(predicate) = &self.query_params.filter {
// TODO: eval by predicate instead of directly regarding it as false
let res = predicate
.eval_bool(Some(&input))
.map_err(|e| FnExecError::from(e))?;
Expand Down

0 comments on commit 7771445

Please sign in to comment.