From 225eb97a4241ef6afcc72725cf0fae9855962326 Mon Sep 17 00:00:00 2001 From: BingqingLyu Date: Thu, 21 Sep 2023 19:31:30 +0800 Subject: [PATCH] minor refine --- .../ir/runtime/src/process/operator/source.rs | 122 ++++++++---------- 1 file changed, 54 insertions(+), 68 deletions(-) diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/source.rs b/interactive_engine/executor/ir/runtime/src/process/operator/source.rs index 32a264cfae3d..76f9179ca349 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/source.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/source.rs @@ -35,9 +35,6 @@ pub enum SourceType { Vertex, Edge, Table, - VCount, - ECount, - TableCount, Dummy, } @@ -49,6 +46,7 @@ pub struct SourceOperator { primary_key_values: Option, alias: Option, source_type: SourceType, + is_count_only: bool, } impl Default for SourceOperator { @@ -59,6 +57,7 @@ impl Default for SourceOperator { primary_key_values: None, alias: None, source_type: SourceType::Dummy, + is_count_only: false, } } } @@ -130,57 +129,62 @@ impl SourceOperator { match self.source_type { SourceType::Vertex => { - let mut v_source = Box::new(std::iter::empty()) as Box + Send>; - if let Some(ref seeds) = self.src { - if let Some(src) = seeds.get(&(worker_index as u64)) { - if !src.is_empty() { - v_source = graph.get_vertex(src, &self.query_params)?; + if self.is_count_only { + let count = graph.count_vertex(&self.query_params)?; + Ok(Box::new(vec![Record::new(object!(count), self.alias.clone())].into_iter())) + } else { + let mut v_source = + Box::new(std::iter::empty()) as Box + Send>; + if let Some(ref seeds) = self.src { + if let Some(src) = seeds.get(&(worker_index as u64)) { + if !src.is_empty() { + v_source = graph.get_vertex(src, &self.query_params)?; + } } - } - } else if let Some(ref indexed_values) = self.primary_key_values { - if self.query_params.labels.is_empty() { - Err(FnGenError::unsupported_error( - "Empty label in `IndexScan` self.query_params.labels", - ))? - } - let mut source_vertices = vec![]; - for label in &self.query_params.labels { - if let Some(v) = - graph.index_scan_vertex(*label, indexed_values, &self.query_params)? - { - source_vertices.push(v); + } else if let Some(ref indexed_values) = self.primary_key_values { + if self.query_params.labels.is_empty() { + Err(FnGenError::unsupported_error( + "Empty label in `IndexScan` self.query_params.labels", + ))? } - } - v_source = Box::new(source_vertices.into_iter()); - } else { - // parallel scan, and each worker should scan the partitions assigned to it in self.v_params.partitions - v_source = graph.scan_vertex(&self.query_params)?; - }; - Ok(Box::new(v_source.map(move |v| Record::new(v, self.alias.clone())))) + let mut source_vertices = vec![]; + for label in &self.query_params.labels { + if let Some(v) = + graph.index_scan_vertex(*label, indexed_values, &self.query_params)? + { + source_vertices.push(v); + } + } + v_source = Box::new(source_vertices.into_iter()); + } else { + // parallel scan, and each worker should scan the partitions assigned to it in self.v_params.partitions + v_source = graph.scan_vertex(&self.query_params)?; + }; + Ok(Box::new(v_source.map(move |v| Record::new(v, self.alias.clone())))) + } } SourceType::Edge => { - let mut e_source = Box::new(std::iter::empty()) as Box + Send>; - if let Some(ref seeds) = self.src { - if let Some(src) = seeds.get(&(worker_index as u64)) { - if !src.is_empty() { - e_source = graph.get_edge(src, &self.query_params)?; + if self.is_count_only { + let count = graph.count_edge(&self.query_params)?; + Ok(Box::new(vec![Record::new(object!(count), self.alias.clone())].into_iter())) + } else { + let mut e_source = + Box::new(std::iter::empty()) as Box + Send>; + if let Some(ref seeds) = self.src { + if let Some(src) = seeds.get(&(worker_index as u64)) { + if !src.is_empty() { + e_source = graph.get_edge(src, &self.query_params)?; + } } + } else { + // parallel scan, and each worker should scan the partitions assigned to it in self.e_params.partitions + e_source = graph.scan_edge(&self.query_params)?; } - } else { - // parallel scan, and each worker should scan the partitions assigned to it in self.e_params.partitions - e_source = graph.scan_edge(&self.query_params)?; + Ok(Box::new(e_source.map(move |e| Record::new(e, self.alias.clone())))) } - Ok(Box::new(e_source.map(move |e| Record::new(e, self.alias.clone())))) - } - SourceType::VCount => { - let count = graph.count_vertex(&self.query_params)?; - Ok(Box::new(vec![Record::new(object!(count), self.alias.clone())].into_iter())) } - SourceType::ECount => { - let count = graph.count_edge(&self.query_params)?; - Ok(Box::new(vec![Record::new(object!(count), self.alias.clone())].into_iter())) - } - SourceType::Table | SourceType::TableCount => Err(FnGenError::unsupported_error( + + SourceType::Table => Err(FnGenError::unsupported_error( "neither `Edge` nor `Vertex` but `Table` type `Source` opr", ))?, SourceType::Dummy => { @@ -197,29 +201,10 @@ impl TryFrom for SourceOperator { fn try_from(scan_pb: pb::Scan) -> Result { let scan_opt: algebra_pb::scan::ScanOpt = unsafe { ::std::mem::transmute(scan_pb.scan_opt) }; - let is_count_only = scan_pb.is_count_only; let source_type = match scan_opt { - algebra_pb::scan::ScanOpt::Vertex => { - if is_count_only { - SourceType::VCount - } else { - SourceType::Vertex - } - } - algebra_pb::scan::ScanOpt::Edge => { - if is_count_only { - SourceType::ECount - } else { - SourceType::Edge - } - } - algebra_pb::scan::ScanOpt::Table => { - if is_count_only { - SourceType::TableCount - } else { - SourceType::Table - } - } + algebra_pb::scan::ScanOpt::Vertex => SourceType::Vertex, + algebra_pb::scan::ScanOpt::Edge => SourceType::Edge, + algebra_pb::scan::ScanOpt::Table => SourceType::Table, }; let query_params = QueryParams::try_from(scan_pb.params)?; Ok(SourceOperator { @@ -228,6 +213,7 @@ impl TryFrom for SourceOperator { primary_key_values: None, alias: scan_pb.alias, source_type, + is_count_only: scan_pb.is_count_only, }) } }