diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/IrPlan.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/IrPlan.java index 93bcc86fabcb..f74bb06c7a74 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/IrPlan.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/IrPlan.java @@ -96,6 +96,10 @@ public Pointer apply(InterOpBase baseOp) { } } + if (op.isCountOnly()) { + irCoreLib.setCountOnly(scan, true); + } + Optional aliasOpt = baseOp.getAlias(); if (aliasOpt.isPresent()) { FfiAlias.ByValue alias = (FfiAlias.ByValue) aliasOpt.get().applyArg(); diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/intermediate/InterOpCollection.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/intermediate/InterOpCollection.java index 8b3667a001e3..9f210667f704 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/intermediate/InterOpCollection.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/intermediate/InterOpCollection.java @@ -24,6 +24,7 @@ import com.alibaba.graphscope.common.intermediate.process.SinkOutputProcessor; import com.alibaba.graphscope.common.intermediate.process.SubGraphProjectProcessor; import com.alibaba.graphscope.common.intermediate.strategy.InterOpStrategy; +import com.alibaba.graphscope.common.intermediate.strategy.SourceCountFusionStrategy; import com.alibaba.graphscope.common.intermediate.strategy.TopKStrategy; import com.alibaba.graphscope.common.intermediate.strategy.UnfoldFusionStrategy; @@ -38,7 +39,10 @@ public class InterOpCollection { private List opCollection; private static List strategies = - Arrays.asList(TopKStrategy.INSTANCE, UnfoldFusionStrategy.INSTANCE); + Arrays.asList( + TopKStrategy.INSTANCE, + UnfoldFusionStrategy.INSTANCE, + SourceCountFusionStrategy.INSTANCE); // order matters, process SubGraphProject before the SinkOutput private static List processors = Arrays.asList(SubGraphProjectProcessor.INSTANCE, SinkOutputProcessor.INSTANCE); @@ -59,6 +63,10 @@ public void removeInterOp(int i) { opCollection.remove(i); } + public void replaceInterOp(int i, InterOpBase op) { + opCollection.set(i, op); + } + public static void applyStrategies(InterOpCollection opCollection) { opCollection .unmodifiableCollection() diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/intermediate/operator/ScanFusionOp.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/intermediate/operator/ScanFusionOp.java index ac0a86c50d4c..d1e6f1b7dba7 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/intermediate/operator/ScanFusionOp.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/intermediate/operator/ScanFusionOp.java @@ -46,6 +46,12 @@ public Optional getParams() { return params; } + public boolean isCountOnly() { + return isCountOnly; + } + + private boolean isCountOnly; + public void setScanOpt(OpArg scanOpt) { this.scanOpt = Optional.of(scanOpt); } @@ -57,4 +63,8 @@ public void setIds(OpArg ids) { public void setParams(QueryParams params) { this.params = Optional.of(params); } + + public void setCountOnly(boolean isCountOnly) { + this.isCountOnly = isCountOnly; + } } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/intermediate/strategy/SourceCountFusionStrategy.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/intermediate/strategy/SourceCountFusionStrategy.java new file mode 100644 index 000000000000..8d2cfa227242 --- /dev/null +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/intermediate/strategy/SourceCountFusionStrategy.java @@ -0,0 +1,79 @@ +/* + * Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.graphscope.common.intermediate.strategy; + +import com.alibaba.graphscope.common.intermediate.ArgAggFn; +import com.alibaba.graphscope.common.intermediate.InterOpCollection; +import com.alibaba.graphscope.common.intermediate.operator.*; +import com.alibaba.graphscope.common.jna.type.FfiAggOpt; +import com.google.common.collect.ImmutableList; + +import org.javatuples.Pair; + +import java.util.List; +import java.util.Optional; + +public class SourceCountFusionStrategy implements InterOpStrategy { + public static SourceCountFusionStrategy INSTANCE = new SourceCountFusionStrategy(); + + private SourceCountFusionStrategy() {} + + @Override + public void apply(InterOpCollection opCollection) { + List original = opCollection.unmodifiableCollection(); + for (int i = original.size() - 2; i >= 0; --i) { + InterOpBase cur = original.get(i); + ArgAggFn next = nextCount(original, i); + if (cur instanceof ScanFusionOp && !cur.getAlias().isPresent() && next != null) { + ((ScanFusionOp) cur).setCountOnly(true); + opCollection.replaceInterOp(i + 1, createSumOp(next)); + } + } + } + + // return ArgAggFn if next is count(), otherwise null + private ArgAggFn nextCount(List original, int cur) { + int next = cur + 1; + if (next >= 0 && next < original.size() && original.get(next) instanceof GroupOp) { + GroupOp groupOp = (GroupOp) original.get(next); + Optional groupKeysOpt = groupOp.getGroupByKeys(); + Optional groupValuesOpt = groupOp.getGroupByValues(); + if (groupKeysOpt.isPresent()) { + List groupKeys = (List) groupKeysOpt.get().applyArg(); + // groupKeys is empty means group by all + if (groupKeys.isEmpty() && groupValuesOpt.isPresent()) { + List groupValues = (List) groupValuesOpt.get().applyArg(); + if (groupValues.size() == 1 + && groupValues.get(0).getAggregate() == FfiAggOpt.Count) { + return groupValues.get(0); + } + } + } + } + return null; + } + + private GroupOp createSumOp(ArgAggFn count) { + GroupOp sum = new GroupOp(); + sum.setGroupByKeys(new OpArg(ImmutableList.of())); + sum.setGroupByValues( + new OpArg( + ImmutableList.of( + new ArgAggFn(FfiAggOpt.Sum, count.getAlias(), count.getVar())))); + return sum; + } +} diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/jna/IrCoreLibrary.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/jna/IrCoreLibrary.java index 2045c8eba10d..107d4b583611 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/jna/IrCoreLibrary.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/jna/IrCoreLibrary.java @@ -64,6 +64,8 @@ FfiResult.ByValue orEquivPredicate( FfiResult.ByValue setScanAlias(Pointer scan, FfiAlias.ByValue alias); + FfiResult.ByValue setCountOnly(Pointer scan, boolean countOnly); + Pointer initEdgexpdOperator(FfiExpandOpt expandOpt, FfiDirection direction); FfiResult.ByValue appendEdgexpdOperator( diff --git a/interactive_engine/executor/ir/core/src/plan/ffi.rs b/interactive_engine/executor/ir/core/src/plan/ffi.rs index 1b48d77bff7b..692b774f3d64 100644 --- a/interactive_engine/executor/ir/core/src/plan/ffi.rs +++ b/interactive_engine/executor/ir/core/src/plan/ffi.rs @@ -613,6 +613,14 @@ pub extern "C" fn destroy_ffi_data(data: FfiData) { } } +// To release a cstr pointer +#[no_mangle] +pub extern "C" fn destroy_cstr_pointer(cstr: *const c_char) { + if !cstr.is_null() { + let _ = unsafe { std::ffi::CString::from_raw(cstr as *mut c_char) }; + } +} + /// To build a physical plan from the logical plan. #[no_mangle] pub extern "C" fn build_physical_plan( @@ -1849,6 +1857,14 @@ mod scan { set_alias(ptr_scan, alias, InnerOpt::Scan) } + #[no_mangle] + pub extern "C" fn set_count_only(ptr: *const c_void, is_count_only: i32) -> FfiResult { + let mut scan = unsafe { Box::from_raw(ptr as *mut pb::Scan) }; + scan.is_count_only = is_count_only != 0; + std::mem::forget(scan); + FfiResult::success() + } + /// Append a scan operator to the logical plan #[no_mangle] pub extern "C" fn append_scan_operator( @@ -2233,13 +2249,6 @@ mod graph { destroy_ptr::(ptr) } - #[no_mangle] - pub extern "C" fn destroy_cstr_pointer(cstr: *const c_char) { - if !cstr.is_null() { - let _ = unsafe { std::ffi::CString::from_raw(cstr as *mut c_char) }; - } - } - #[allow(dead_code)] #[repr(i32)] pub enum PathOpt { diff --git a/interactive_engine/executor/ir/graph_proxy/src/adapters/csr_store/read_graph.rs b/interactive_engine/executor/ir/graph_proxy/src/adapters/csr_store/read_graph.rs index 217095f85148..a63ecfdfe8cf 100644 --- a/interactive_engine/executor/ir/graph_proxy/src/adapters/csr_store/read_graph.rs +++ b/interactive_engine/executor/ir/graph_proxy/src/adapters/csr_store/read_graph.rs @@ -174,28 +174,38 @@ impl ReadGraph for CSRStore { } fn count_vertex(&self, params: &QueryParams) -> GraphProxyResult { - let worker_index = self.cluster_info.get_worker_index()?; - let workers_num = self.cluster_info.get_local_worker_num()?; - if worker_index % workers_num == 0 { - let label_ids = encode_storage_label(¶ms.labels); - let count = self - .store - .count_all_vertices(label_ids.as_ref()); - Ok(count as u64) + if params.filter.is_some() { + // the filter can not be pushed down to store, + // so we need to scan all vertices with filter and then count + Ok(self.scan_vertex(params)?.count() as u64) } else { - Ok(0) + let worker_index = self.cluster_info.get_worker_index()?; + let workers_num = self.cluster_info.get_local_worker_num()?; + if worker_index % workers_num == 0 { + let label_ids = encode_storage_label(¶ms.labels); + let count = self + .store + .count_all_vertices(label_ids.as_ref()); + Ok(count as u64) + } else { + Ok(0) + } } } fn count_edge(&self, params: &QueryParams) -> GraphProxyResult { - let worker_index = self.cluster_info.get_worker_index()?; - let workers_num = self.cluster_info.get_local_worker_num()?; - if worker_index % workers_num == 0 { - let label_ids = encode_storage_label(¶ms.labels); - let count = self.store.count_all_edges(label_ids.as_ref()); - Ok(count as u64) + if params.filter.is_some() { + Ok(self.scan_edge(params)?.count() as u64) } else { - Ok(0) + let worker_index = self.cluster_info.get_worker_index()?; + let workers_num = self.cluster_info.get_local_worker_num()?; + if worker_index % workers_num == 0 { + let label_ids = encode_storage_label(¶ms.labels); + let count = self.store.count_all_edges(label_ids.as_ref()); + Ok(count as u64) + } else { + Ok(0) + } } } } diff --git a/interactive_engine/executor/ir/graph_proxy/src/adapters/exp_store/read_graph.rs b/interactive_engine/executor/ir/graph_proxy/src/adapters/exp_store/read_graph.rs index ec6a4d70f2f4..671129883d8c 100644 --- a/interactive_engine/executor/ir/graph_proxy/src/adapters/exp_store/read_graph.rs +++ b/interactive_engine/executor/ir/graph_proxy/src/adapters/exp_store/read_graph.rs @@ -367,26 +367,36 @@ impl ReadGraph for ExpStore { } fn count_vertex(&self, params: &QueryParams) -> GraphProxyResult { - let worker_idx = self.cluster_info.get_worker_index()?; - let workers_num = self.cluster_info.get_local_worker_num()?; - if worker_idx % workers_num == 0 { - let label_ids = encode_storage_label(¶ms.labels); - Ok(self - .store - .count_all_vertices(label_ids.as_ref()) as u64) + if params.filter.is_some() { + // the filter can not be pushed down to exp_store, + // so we need to scan all vertices with filter and then count + Ok(self.scan_vertex(params)?.count() as u64) } else { - Ok(0) + let worker_idx = self.cluster_info.get_worker_index()?; + let workers_num = self.cluster_info.get_local_worker_num()?; + if worker_idx % workers_num == 0 { + let label_ids = encode_storage_label(¶ms.labels); + Ok(self + .store + .count_all_vertices(label_ids.as_ref()) as u64) + } else { + Ok(0) + } } } fn count_edge(&self, params: &QueryParams) -> GraphProxyResult { - let worker_idx = self.cluster_info.get_worker_index()?; - let workers_num = self.cluster_info.get_local_worker_num()?; - if worker_idx % workers_num == 0 { - let label_ids = encode_storage_label(¶ms.labels); - Ok(self.store.count_all_edges(label_ids.as_ref()) as u64) + if params.filter.is_some() { + Ok(self.scan_edge(params)?.count() as u64) } else { - Ok(0) + let worker_idx = self.cluster_info.get_worker_index()?; + let workers_num = self.cluster_info.get_local_worker_num()?; + if worker_idx % workers_num == 0 { + let label_ids = encode_storage_label(¶ms.labels); + Ok(self.store.count_all_edges(label_ids.as_ref()) as u64) + } else { + Ok(0) + } } } } diff --git a/interactive_engine/executor/ir/graph_proxy/src/adapters/gs_store/read_graph.rs b/interactive_engine/executor/ir/graph_proxy/src/adapters/gs_store/read_graph.rs index ed805c7c2d12..1ed166084f05 100644 --- a/interactive_engine/executor/ir/graph_proxy/src/adapters/gs_store/read_graph.rs +++ b/interactive_engine/executor/ir/graph_proxy/src/adapters/gs_store/read_graph.rs @@ -478,40 +478,51 @@ where } fn count_vertex(&self, params: &QueryParams) -> GraphProxyResult { - let worker_partitions = assign_worker_partitions(&self.server_partitions, &self.cluster_info)?; - if !worker_partitions.is_empty() { - let store = self.store.clone(); - let si = params - .get_extra_param(SNAPSHOT_ID) - .map(|s| { - s.parse::() - .unwrap_or(DEFAULT_SNAPSHOT_ID) - }) - .unwrap_or(DEFAULT_SNAPSHOT_ID); - let label_ids = encode_storage_labels(params.labels.as_ref())?; - let count = store.count_all_vertices(si, label_ids.as_ref(), None, worker_partitions.as_ref()); - Ok(count) + if params.filter.is_some() { + // the filter can not be pushed down to store, + // so we need to scan all vertices with filter and then count + Ok(self.scan_vertex(params)?.count() as u64) } else { - Ok(0) + let worker_partitions = assign_worker_partitions(&self.server_partitions, &self.cluster_info)?; + if !worker_partitions.is_empty() { + let store = self.store.clone(); + let si = params + .get_extra_param(SNAPSHOT_ID) + .map(|s| { + s.parse::() + .unwrap_or(DEFAULT_SNAPSHOT_ID) + }) + .unwrap_or(DEFAULT_SNAPSHOT_ID); + let label_ids = encode_storage_labels(params.labels.as_ref())?; + let count = + store.count_all_vertices(si, label_ids.as_ref(), None, worker_partitions.as_ref()); + Ok(count) + } else { + Ok(0) + } } } fn count_edge(&self, params: &QueryParams) -> GraphProxyResult { - let worker_partitions = assign_worker_partitions(&self.server_partitions, &self.cluster_info)?; - if !worker_partitions.is_empty() { - let store = self.store.clone(); - let si = params - .get_extra_param(SNAPSHOT_ID) - .map(|s| { - s.parse::() - .unwrap_or(DEFAULT_SNAPSHOT_ID) - }) - .unwrap_or(DEFAULT_SNAPSHOT_ID); - let label_ids = encode_storage_labels(params.labels.as_ref())?; - let count = store.count_all_edges(si, label_ids.as_ref(), None, worker_partitions.as_ref()); - Ok(count) + if params.filter.is_some() { + Ok(self.scan_edge(params)?.count() as u64) } else { - Ok(0) + let worker_partitions = assign_worker_partitions(&self.server_partitions, &self.cluster_info)?; + if !worker_partitions.is_empty() { + let store = self.store.clone(); + let si = params + .get_extra_param(SNAPSHOT_ID) + .map(|s| { + s.parse::() + .unwrap_or(DEFAULT_SNAPSHOT_ID) + }) + .unwrap_or(DEFAULT_SNAPSHOT_ID); + let label_ids = encode_storage_labels(params.labels.as_ref())?; + let count = store.count_all_edges(si, label_ids.as_ref(), None, worker_partitions.as_ref()); + Ok(count) + } else { + Ok(0) + } } } } diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/source.rs b/interactive_engine/executor/ir/runtime/src/process/operator/source.rs index 1304e6ccf192..6d37ac326f2c 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/source.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/source.rs @@ -46,6 +46,8 @@ pub struct SourceOperator { primary_key_values: Option>, alias: Option, source_type: SourceType, + // to specify if the source is a fusion of scan and count + // currently, it may fuse: 1) scan + count; 2) index_scan + count is_count_only: bool, } @@ -133,59 +135,77 @@ impl SourceOperator { match self.source_type { SourceType::Vertex => { - if self.is_count_only { - let count = graph.count_vertex(&self.query_params)?; - Ok(Box::new(vec![Record::new(object!(count), self.alias.clone())].into_iter())) - } else { - let mut v_source = - Box::new(std::iter::empty()) as Box + Send>; - if let Some(seeds) = &self.src { - if let Some(src) = seeds.get(&(worker_index as u64)) { - if !src.is_empty() { - v_source = graph.get_vertex(src, &self.query_params)?; - } + let mut v_source = Box::new(std::iter::empty()) as Box + Send>; + if let Some(seeds) = &self.src { + if let Some(src) = seeds.get(&(worker_index as u64)) { + if !src.is_empty() { + v_source = graph.get_vertex(src, &self.query_params)?; } - } else if let Some(pkvs) = &self.primary_key_values { - if self.query_params.labels.is_empty() { - Err(FnGenError::unsupported_error( - "Empty label in `IndexScan` self.query_params.labels", - ))? - } - let mut source_vertices = vec![]; - for label in &self.query_params.labels { - for pkv in pkvs { - if let Some(v) = graph.index_scan_vertex(*label, pkv, &self.query_params)? { - source_vertices.push(v); - } + } + if self.is_count_only { + let count = v_source.count() as u64; + return Ok(Box::new( + vec![Record::new(object!(count), self.alias.clone())].into_iter(), + )); + } + } else if let Some(pkvs) = &self.primary_key_values { + if self.query_params.labels.is_empty() { + Err(FnGenError::unsupported_error( + "Empty label in `IndexScan` self.query_params.labels", + ))? + } + let mut source_vertices = vec![]; + for label in &self.query_params.labels { + for pkv in pkvs { + if let Some(v) = graph.index_scan_vertex(*label, pkv, &self.query_params)? { + source_vertices.push(v); } } - v_source = Box::new(source_vertices.into_iter()); + } + if self.is_count_only { + let count = source_vertices.len() as u64; + return Ok(Box::new( + vec![Record::new(object!(count), self.alias.clone())].into_iter(), + )); + } + v_source = Box::new(source_vertices.into_iter()); + } else { + if self.is_count_only { + let count = graph.count_vertex(&self.query_params)?; + return Ok(Box::new( + vec![Record::new(object!(count), self.alias.clone())].into_iter(), + )); } else { - // parallel scan, and each worker should scan the partitions assigned to it in self.v_params.partitions v_source = graph.scan_vertex(&self.query_params)?; - }; - Ok(Box::new(v_source.map(move |v| Record::new(v, self.alias.clone())))) - } + } + }; + Ok(Box::new(v_source.map(move |v| Record::new(v, self.alias.clone())))) } SourceType::Edge => { - if self.is_count_only { - let count = graph.count_edge(&self.query_params)?; - Ok(Box::new(vec![Record::new(object!(count), self.alias.clone())].into_iter())) - } else { - let mut e_source = - Box::new(std::iter::empty()) as Box + Send>; - if let Some(ref seeds) = self.src { - if let Some(src) = seeds.get(&(worker_index as u64)) { - if !src.is_empty() { - e_source = graph.get_edge(src, &self.query_params)?; - } + let mut e_source = Box::new(std::iter::empty()) as Box + Send>; + if let Some(ref seeds) = self.src { + if let Some(src) = seeds.get(&(worker_index as u64)) { + if !src.is_empty() { + e_source = graph.get_edge(src, &self.query_params)?; } + } + if self.is_count_only { + let count = e_source.count() as u64; + return Ok(Box::new( + vec![Record::new(object!(count), self.alias.clone())].into_iter(), + )); + } + } else { + if self.is_count_only { + let count = graph.count_edge(&self.query_params)?; + return Ok(Box::new( + vec![Record::new(object!(count), self.alias.clone())].into_iter(), + )); } else { - // parallel scan, and each worker should scan the partitions assigned to it in self.e_params.partitions e_source = graph.scan_edge(&self.query_params)?; } - Ok(Box::new(e_source.map(move |e| Record::new(e, self.alias.clone())))) } + Ok(Box::new(e_source.map(move |e| Record::new(e, self.alias.clone())))) } SourceType::Table => Err(FnGenError::unsupported_error(