Skip to content

Commit

Permalink
minor refine
Browse files Browse the repository at this point in the history
  • Loading branch information
BingqingLyu committed Sep 21, 2023
1 parent 98b740e commit 225eb97
Showing 1 changed file with 54 additions and 68 deletions.
122 changes: 54 additions & 68 deletions interactive_engine/executor/ir/runtime/src/process/operator/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ pub enum SourceType {
Vertex,
Edge,
Table,
VCount,
ECount,
TableCount,
Dummy,
}

Expand All @@ -49,6 +46,7 @@ pub struct SourceOperator {
primary_key_values: Option<PKV>,
alias: Option<KeyId>,
source_type: SourceType,
is_count_only: bool,
}

impl Default for SourceOperator {
Expand All @@ -59,6 +57,7 @@ impl Default for SourceOperator {
primary_key_values: None,
alias: None,
source_type: SourceType::Dummy,
is_count_only: false,
}
}
}
Expand Down Expand Up @@ -130,57 +129,62 @@ impl SourceOperator {

match self.source_type {
SourceType::Vertex => {
let mut v_source = Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Vertex> + Send>;
if let Some(ref seeds) = self.src {
if let Some(src) = seeds.get(&(worker_index as u64)) {
if !src.is_empty() {
v_source = graph.get_vertex(src, &self.query_params)?;
if self.is_count_only {
let count = graph.count_vertex(&self.query_params)?;
Ok(Box::new(vec![Record::new(object!(count), self.alias.clone())].into_iter()))
} else {
let mut v_source =
Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Vertex> + Send>;
if let Some(ref seeds) = self.src {
if let Some(src) = seeds.get(&(worker_index as u64)) {
if !src.is_empty() {
v_source = graph.get_vertex(src, &self.query_params)?;
}
}
}
} else if let Some(ref indexed_values) = self.primary_key_values {
if self.query_params.labels.is_empty() {
Err(FnGenError::unsupported_error(
"Empty label in `IndexScan` self.query_params.labels",
))?
}
let mut source_vertices = vec![];
for label in &self.query_params.labels {
if let Some(v) =
graph.index_scan_vertex(*label, indexed_values, &self.query_params)?
{
source_vertices.push(v);
} else if let Some(ref indexed_values) = self.primary_key_values {
if self.query_params.labels.is_empty() {
Err(FnGenError::unsupported_error(
"Empty label in `IndexScan` self.query_params.labels",
))?
}
}
v_source = Box::new(source_vertices.into_iter());
} else {
// parallel scan, and each worker should scan the partitions assigned to it in self.v_params.partitions
v_source = graph.scan_vertex(&self.query_params)?;
};
Ok(Box::new(v_source.map(move |v| Record::new(v, self.alias.clone()))))
let mut source_vertices = vec![];
for label in &self.query_params.labels {
if let Some(v) =
graph.index_scan_vertex(*label, indexed_values, &self.query_params)?
{
source_vertices.push(v);
}
}
v_source = Box::new(source_vertices.into_iter());
} else {
// parallel scan, and each worker should scan the partitions assigned to it in self.v_params.partitions
v_source = graph.scan_vertex(&self.query_params)?;
};
Ok(Box::new(v_source.map(move |v| Record::new(v, self.alias.clone()))))
}
}
SourceType::Edge => {
let mut e_source = Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + Send>;
if let Some(ref seeds) = self.src {
if let Some(src) = seeds.get(&(worker_index as u64)) {
if !src.is_empty() {
e_source = graph.get_edge(src, &self.query_params)?;
if self.is_count_only {
let count = graph.count_edge(&self.query_params)?;
Ok(Box::new(vec![Record::new(object!(count), self.alias.clone())].into_iter()))
} else {
let mut e_source =
Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + Send>;
if let Some(ref seeds) = self.src {
if let Some(src) = seeds.get(&(worker_index as u64)) {
if !src.is_empty() {
e_source = graph.get_edge(src, &self.query_params)?;
}
}
} else {
// parallel scan, and each worker should scan the partitions assigned to it in self.e_params.partitions
e_source = graph.scan_edge(&self.query_params)?;
}
} else {
// parallel scan, and each worker should scan the partitions assigned to it in self.e_params.partitions
e_source = graph.scan_edge(&self.query_params)?;
Ok(Box::new(e_source.map(move |e| Record::new(e, self.alias.clone()))))
}
Ok(Box::new(e_source.map(move |e| Record::new(e, self.alias.clone()))))
}
SourceType::VCount => {
let count = graph.count_vertex(&self.query_params)?;
Ok(Box::new(vec![Record::new(object!(count), self.alias.clone())].into_iter()))
}
SourceType::ECount => {
let count = graph.count_edge(&self.query_params)?;
Ok(Box::new(vec![Record::new(object!(count), self.alias.clone())].into_iter()))
}
SourceType::Table | SourceType::TableCount => Err(FnGenError::unsupported_error(

SourceType::Table => Err(FnGenError::unsupported_error(
"neither `Edge` nor `Vertex` but `Table` type `Source` opr",
))?,
SourceType::Dummy => {
Expand All @@ -197,29 +201,10 @@ impl TryFrom<pb::Scan> for SourceOperator {

fn try_from(scan_pb: pb::Scan) -> Result<Self, Self::Error> {
let scan_opt: algebra_pb::scan::ScanOpt = unsafe { ::std::mem::transmute(scan_pb.scan_opt) };
let is_count_only = scan_pb.is_count_only;
let source_type = match scan_opt {
algebra_pb::scan::ScanOpt::Vertex => {
if is_count_only {
SourceType::VCount
} else {
SourceType::Vertex
}
}
algebra_pb::scan::ScanOpt::Edge => {
if is_count_only {
SourceType::ECount
} else {
SourceType::Edge
}
}
algebra_pb::scan::ScanOpt::Table => {
if is_count_only {
SourceType::TableCount
} else {
SourceType::Table
}
}
algebra_pb::scan::ScanOpt::Vertex => SourceType::Vertex,
algebra_pb::scan::ScanOpt::Edge => SourceType::Edge,
algebra_pb::scan::ScanOpt::Table => SourceType::Table,
};
let query_params = QueryParams::try_from(scan_pb.params)?;
Ok(SourceOperator {
Expand All @@ -228,6 +213,7 @@ impl TryFrom<pb::Scan> for SourceOperator {
primary_key_values: None,
alias: scan_pb.alias,
source_type,
is_count_only: scan_pb.is_count_only,
})
}
}

0 comments on commit 225eb97

Please sign in to comment.