Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(interactive): Fuse Source With Count to Optimize in Gremlin #3320

Merged
merged 7 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@
}
}

if (op.isCountOnly()) {
irCoreLib.setCountOnly(scan, true);
}

Optional<OpArg> aliasOpt = baseOp.getAlias();
if (aliasOpt.isPresent()) {
FfiAlias.ByValue alias = (FfiAlias.ByValue) aliasOpt.get().applyArg();
Expand Down Expand Up @@ -807,102 +811,102 @@
}

// return id of the current operator
private IntByReference appendInterOp(int parentId, InterOpBase base)
throws InterOpIllegalArgException, InterOpUnsupportedException, AppendInterOpException {
FfiResult error;
IntByReference oprId = new IntByReference(parentId);
if (ClassUtils.equalClass(base, ScanFusionOp.class)) {
Pointer ptrScan = TransformFactory.SCAN_FUSION_OP.apply(base);
error = irCoreLib.appendScanOperator(ptrPlan, ptrScan, oprId.getValue(), oprId);
} else if (ClassUtils.equalClass(base, SelectOp.class)) {
Pointer ptrSelect = TransformFactory.SELECT_OP.apply(base);
error = irCoreLib.appendSelectOperator(ptrPlan, ptrSelect, oprId.getValue(), oprId);
} else if (ClassUtils.equalClass(base, ExpandOp.class)) {
Pointer ptrExpand = TransformFactory.EXPAND_OP.apply(base);
error = irCoreLib.appendEdgexpdOperator(ptrPlan, ptrExpand, oprId.getValue(), oprId);
} else if (ClassUtils.equalClass(base, LimitOp.class)) {
Pointer ptrLimit = TransformFactory.LIMIT_OP.apply(base);
error = irCoreLib.appendLimitOperator(ptrPlan, ptrLimit, oprId.getValue(), oprId);
} else if (ClassUtils.equalClass(base, ProjectOp.class)) {
Pointer ptrProject = TransformFactory.PROJECT_OP.apply(base);
error = irCoreLib.appendProjectOperator(ptrPlan, ptrProject, oprId.getValue(), oprId);
} else if (ClassUtils.equalClass(base, OrderOp.class)) {
Pointer ptrOrder = TransformFactory.ORDER_OP.apply(base);
error = irCoreLib.appendOrderbyOperator(ptrPlan, ptrOrder, oprId.getValue(), oprId);
} else if (ClassUtils.equalClass(base, GroupOp.class)) {
Pointer ptrGroup = TransformFactory.GROUP_OP.apply(base);
error = irCoreLib.appendGroupbyOperator(ptrPlan, ptrGroup, oprId.getValue(), oprId);
} else if (ClassUtils.equalClass(base, DedupOp.class)) {
Pointer ptrDedup = TransformFactory.DEDUP_OP.apply(base);
error = irCoreLib.appendDedupOperator(ptrPlan, ptrDedup, oprId.getValue(), oprId);
} else if (ClassUtils.equalClass(base, SinkOp.class)) {
Pointer ptrSink = TransformFactory.SINK_OP.apply(base);
error = irCoreLib.appendSinkOperator(ptrPlan, ptrSink, oprId.getValue(), oprId);
} else if (ClassUtils.equalClass(base, PathExpandOp.class)) {
Pointer ptrPathXPd = TransformFactory.PATH_EXPAND_OP.apply(base);
error = irCoreLib.appendPathxpdOperator(ptrPlan, ptrPathXPd, oprId.getValue(), oprId);
} else if (ClassUtils.equalClass(base, GetVOp.class)) {
Pointer ptrGetV = TransformFactory.GETV_OP.apply(base);
error = irCoreLib.appendGetvOperator(ptrPlan, ptrGetV, oprId.getValue(), oprId);
} else if (ClassUtils.equalClass(base, ApplyOp.class)) {
ApplyOp applyOp = (ApplyOp) base;
Optional<OpArg> subOps = applyOp.getSubOpCollection();
if (!subOps.isPresent()) {
throw new InterOpIllegalArgException(
base.getClass(), "subOpCollection", "is not present in apply");
}
InterOpCollection opCollection = (InterOpCollection) subOps.get().applyArg();
Pair<Integer, Integer> oprIdPair = appendInterOpCollection(-1, opCollection);
applyOp.setSubRootId(
new OpArg(Integer.valueOf(oprIdPair.getValue0()), Function.identity()));

Pointer ptrApply = TransformFactory.APPLY_OP.apply(base);
error = irCoreLib.appendApplyOperator(ptrPlan, ptrApply, oprId.getValue(), oprId);
} else if (ClassUtils.equalClass(base, UnionOp.class)
|| ClassUtils.equalClass(base, SubGraphAsUnionOp.class)) {
UnionOp unionOp = (UnionOp) base;
Optional<OpArg> subOpsListOpt = unionOp.getSubOpCollectionList();
if (!subOpsListOpt.isPresent()) {
throw new InterOpIllegalArgException(
base.getClass(), "subOpCollectionList", "is not present in union");
}
List<InterOpCollection> subOpsList =
(List<InterOpCollection>) subOpsListOpt.get().applyArg();
if (subOpsList.isEmpty()) {
throw new InterOpIllegalArgException(
base.getClass(), "subOpCollectionList", "is empty in union");
}
List<Integer> unionParentIds = new ArrayList<>();
for (InterOpCollection opCollection : subOpsList) {
Pair<Integer, Integer> oprIdPair = appendInterOpCollection(parentId, opCollection);
unionParentIds.add(oprIdPair.getValue1());
}
unionOp.setParentIdList(new OpArg(unionParentIds, Function.identity()));
Pointer ptrUnion = TransformFactory.UNION_OP.apply(base);
error = irCoreLib.appendUnionOperator(ptrPlan, ptrUnion, oprId);
} else if (ClassUtils.equalClass(base, MatchOp.class)) {
Pointer ptrMatch = TransformFactory.MATCH_OP.apply(base);
error = irCoreLib.appendPatternOperator(ptrPlan, ptrMatch, oprId.getValue(), oprId);
} else if (ClassUtils.equalClass(base, UnfoldOp.class)) {
Pointer ptrUnfold = TransformFactory.UNFOLD_OP.apply(base);
error = irCoreLib.appendUnfoldOperator(ptrPlan, ptrUnfold, oprId.getValue(), oprId);
} else if (ClassUtils.equalClass(base, AsNoneOp.class)) {
Pointer ptrAs = TransformFactory.AS_NONE_OP.apply(base);
error = irCoreLib.appendAsOperator(ptrPlan, ptrAs, oprId.getValue(), oprId);
} else if (ClassUtils.equalClass(base, SampleOp.class)) {
Pointer ptrSample = TransformFactory.SAMPLE_OP.apply(base);
error = irCoreLib.appendSampleOperator(ptrPlan, ptrSample, oprId.getValue(), oprId);
} else {
throw new InterOpUnsupportedException(base.getClass(), "unimplemented yet");
}
if (error != null && error.code != ResultCode.Success) {
throw new AppendInterOpException(
base.getClass(), error.code.name() + ", msg is " + error.getMsg());
}
// add alias after the op if necessary
return setPostAlias(oprId.getValue(), base);
}

Check notice on line 909 in interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/IrPlan.java

View check run for this annotation

codefactor.io / CodeFactor

interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/IrPlan.java#L814-L909

Complex Method
private IntByReference setPostAlias(int parentId, InterOpBase base) {
IntByReference oprId = new IntByReference(parentId);
if (isPostAliasOp(base) && base.getAlias().isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.alibaba.graphscope.common.intermediate.process.SinkOutputProcessor;
import com.alibaba.graphscope.common.intermediate.process.SubGraphProjectProcessor;
import com.alibaba.graphscope.common.intermediate.strategy.InterOpStrategy;
import com.alibaba.graphscope.common.intermediate.strategy.SourceCountFusionStrategy;
import com.alibaba.graphscope.common.intermediate.strategy.TopKStrategy;
import com.alibaba.graphscope.common.intermediate.strategy.UnfoldFusionStrategy;

Expand All @@ -38,7 +39,10 @@
public class InterOpCollection {
private List<InterOpBase> opCollection;
private static List<InterOpStrategy> strategies =
Arrays.asList(TopKStrategy.INSTANCE, UnfoldFusionStrategy.INSTANCE);
Arrays.asList(
TopKStrategy.INSTANCE,
UnfoldFusionStrategy.INSTANCE,
SourceCountFusionStrategy.INSTANCE);
// order matters, process SubGraphProject before the SinkOutput
private static List<InterOpProcessor> processors =
Arrays.asList(SubGraphProjectProcessor.INSTANCE, SinkOutputProcessor.INSTANCE);
Expand All @@ -59,6 +63,10 @@ public void removeInterOp(int i) {
opCollection.remove(i);
}

public void replaceInterOp(int i, InterOpBase op) {
opCollection.set(i, op);
}

public static void applyStrategies(InterOpCollection opCollection) {
opCollection
.unmodifiableCollection()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ public Optional<QueryParams> getParams() {
return params;
}

public boolean isCountOnly() {
return isCountOnly;
}

private boolean isCountOnly;

public void setScanOpt(OpArg scanOpt) {
this.scanOpt = Optional.of(scanOpt);
}
Expand All @@ -57,4 +63,8 @@ public void setIds(OpArg ids) {
public void setParams(QueryParams params) {
this.params = Optional.of(params);
}

public void setCountOnly(boolean isCountOnly) {
this.isCountOnly = isCountOnly;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.graphscope.common.intermediate.strategy;

import com.alibaba.graphscope.common.intermediate.ArgAggFn;
import com.alibaba.graphscope.common.intermediate.InterOpCollection;
import com.alibaba.graphscope.common.intermediate.operator.*;
import com.alibaba.graphscope.common.jna.type.FfiAggOpt;
import com.google.common.collect.ImmutableList;

import org.javatuples.Pair;

import java.util.List;
import java.util.Optional;

public class SourceCountFusionStrategy implements InterOpStrategy {
public static SourceCountFusionStrategy INSTANCE = new SourceCountFusionStrategy();

private SourceCountFusionStrategy() {}

@Override
public void apply(InterOpCollection opCollection) {
List<InterOpBase> original = opCollection.unmodifiableCollection();
for (int i = original.size() - 2; i >= 0; --i) {
InterOpBase cur = original.get(i);
ArgAggFn next = nextCount(original, i);
if (cur instanceof ScanFusionOp && !cur.getAlias().isPresent() && next != null) {
((ScanFusionOp) cur).setCountOnly(true);
opCollection.replaceInterOp(i + 1, createSumOp(next));
}
}
}

// return ArgAggFn if next is count(), otherwise null
private ArgAggFn nextCount(List<InterOpBase> original, int cur) {
int next = cur + 1;
if (next >= 0 && next < original.size() && original.get(next) instanceof GroupOp) {
GroupOp groupOp = (GroupOp) original.get(next);
Optional<OpArg> groupKeysOpt = groupOp.getGroupByKeys();
Optional<OpArg> groupValuesOpt = groupOp.getGroupByValues();
if (groupKeysOpt.isPresent()) {
List<Pair> groupKeys = (List<Pair>) groupKeysOpt.get().applyArg();
// groupKeys is empty means group by all
if (groupKeys.isEmpty() && groupValuesOpt.isPresent()) {
List<ArgAggFn> groupValues = (List<ArgAggFn>) groupValuesOpt.get().applyArg();
if (groupValues.size() == 1
&& groupValues.get(0).getAggregate() == FfiAggOpt.Count) {
return groupValues.get(0);
}
}
}
}
return null;
}

private GroupOp createSumOp(ArgAggFn count) {
GroupOp sum = new GroupOp();
sum.setGroupByKeys(new OpArg(ImmutableList.of()));
sum.setGroupByValues(
new OpArg(
ImmutableList.of(
new ArgAggFn(FfiAggOpt.Sum, count.getAlias(), count.getVar()))));
return sum;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ FfiResult.ByValue orEquivPredicate(

FfiResult.ByValue setScanAlias(Pointer scan, FfiAlias.ByValue alias);

FfiResult.ByValue setCountOnly(Pointer scan, boolean countOnly);

Pointer initEdgexpdOperator(FfiExpandOpt expandOpt, FfiDirection direction);

FfiResult.ByValue appendEdgexpdOperator(
Expand Down
23 changes: 16 additions & 7 deletions interactive_engine/executor/ir/core/src/plan/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,14 @@ pub extern "C" fn destroy_ffi_data(data: FfiData) {
}
}

// To release a cstr pointer
#[no_mangle]
pub extern "C" fn destroy_cstr_pointer(cstr: *const c_char) {
if !cstr.is_null() {
let _ = unsafe { std::ffi::CString::from_raw(cstr as *mut c_char) };
}
}

/// To build a physical plan from the logical plan.
#[no_mangle]
pub extern "C" fn build_physical_plan(
Expand Down Expand Up @@ -1849,6 +1857,14 @@ mod scan {
set_alias(ptr_scan, alias, InnerOpt::Scan)
}

#[no_mangle]
pub extern "C" fn set_count_only(ptr: *const c_void, is_count_only: i32) -> FfiResult {
let mut scan = unsafe { Box::from_raw(ptr as *mut pb::Scan) };
scan.is_count_only = is_count_only != 0;
std::mem::forget(scan);
FfiResult::success()
}

/// Append a scan operator to the logical plan
#[no_mangle]
pub extern "C" fn append_scan_operator(
Expand Down Expand Up @@ -2233,13 +2249,6 @@ mod graph {
destroy_ptr::<pb::GetV>(ptr)
}

#[no_mangle]
pub extern "C" fn destroy_cstr_pointer(cstr: *const c_char) {
if !cstr.is_null() {
let _ = unsafe { std::ffi::CString::from_raw(cstr as *mut c_char) };
}
}

#[allow(dead_code)]
#[repr(i32)]
pub enum PathOpt {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,28 +174,38 @@ impl ReadGraph for CSRStore {
}

fn count_vertex(&self, params: &QueryParams) -> GraphProxyResult<u64> {
let worker_index = self.cluster_info.get_worker_index()?;
let workers_num = self.cluster_info.get_local_worker_num()?;
if worker_index % workers_num == 0 {
let label_ids = encode_storage_label(&params.labels);
let count = self
.store
.count_all_vertices(label_ids.as_ref());
Ok(count as u64)
if params.filter.is_some() {
// the filter can not be pushed down to store,
// so we need to scan all vertices with filter and then count
Ok(self.scan_vertex(params)?.count() as u64)
} else {
Ok(0)
let worker_index = self.cluster_info.get_worker_index()?;
let workers_num = self.cluster_info.get_local_worker_num()?;
if worker_index % workers_num == 0 {
let label_ids = encode_storage_label(&params.labels);
let count = self
.store
.count_all_vertices(label_ids.as_ref());
Ok(count as u64)
} else {
Ok(0)
}
}
}

fn count_edge(&self, params: &QueryParams) -> GraphProxyResult<u64> {
let worker_index = self.cluster_info.get_worker_index()?;
let workers_num = self.cluster_info.get_local_worker_num()?;
if worker_index % workers_num == 0 {
let label_ids = encode_storage_label(&params.labels);
let count = self.store.count_all_edges(label_ids.as_ref());
Ok(count as u64)
if params.filter.is_some() {
Ok(self.scan_edge(params)?.count() as u64)
} else {
Ok(0)
let worker_index = self.cluster_info.get_worker_index()?;
let workers_num = self.cluster_info.get_local_worker_num()?;
if worker_index % workers_num == 0 {
let label_ids = encode_storage_label(&params.labels);
let count = self.store.count_all_edges(label_ids.as_ref());
Ok(count as u64)
} else {
Ok(0)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,26 +367,36 @@ impl ReadGraph for ExpStore {
}

fn count_vertex(&self, params: &QueryParams) -> GraphProxyResult<u64> {
let worker_idx = self.cluster_info.get_worker_index()?;
let workers_num = self.cluster_info.get_local_worker_num()?;
if worker_idx % workers_num == 0 {
let label_ids = encode_storage_label(&params.labels);
Ok(self
.store
.count_all_vertices(label_ids.as_ref()) as u64)
if params.filter.is_some() {
// the filter can not be pushed down to exp_store,
// so we need to scan all vertices with filter and then count
Ok(self.scan_vertex(params)?.count() as u64)
} else {
Ok(0)
let worker_idx = self.cluster_info.get_worker_index()?;
let workers_num = self.cluster_info.get_local_worker_num()?;
if worker_idx % workers_num == 0 {
let label_ids = encode_storage_label(&params.labels);
Ok(self
.store
.count_all_vertices(label_ids.as_ref()) as u64)
} else {
Ok(0)
}
}
}

fn count_edge(&self, params: &QueryParams) -> GraphProxyResult<u64> {
let worker_idx = self.cluster_info.get_worker_index()?;
let workers_num = self.cluster_info.get_local_worker_num()?;
if worker_idx % workers_num == 0 {
let label_ids = encode_storage_label(&params.labels);
Ok(self.store.count_all_edges(label_ids.as_ref()) as u64)
if params.filter.is_some() {
Ok(self.scan_edge(params)?.count() as u64)
} else {
Ok(0)
let worker_idx = self.cluster_info.get_worker_index()?;
let workers_num = self.cluster_info.get_local_worker_num()?;
if worker_idx % workers_num == 0 {
let label_ids = encode_storage_label(&params.labels);
Ok(self.store.count_all_edges(label_ids.as_ref()) as u64)
} else {
Ok(0)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,40 +478,51 @@ where
}

fn count_vertex(&self, params: &QueryParams) -> GraphProxyResult<u64> {
let worker_partitions = assign_worker_partitions(&self.server_partitions, &self.cluster_info)?;
if !worker_partitions.is_empty() {
let store = self.store.clone();
let si = params
.get_extra_param(SNAPSHOT_ID)
.map(|s| {
s.parse::<SnapshotId>()
.unwrap_or(DEFAULT_SNAPSHOT_ID)
})
.unwrap_or(DEFAULT_SNAPSHOT_ID);
let label_ids = encode_storage_labels(params.labels.as_ref())?;
let count = store.count_all_vertices(si, label_ids.as_ref(), None, worker_partitions.as_ref());
Ok(count)
if params.filter.is_some() {
// the filter can not be pushed down to store,
// so we need to scan all vertices with filter and then count
Ok(self.scan_vertex(params)?.count() as u64)
} else {
Ok(0)
let worker_partitions = assign_worker_partitions(&self.server_partitions, &self.cluster_info)?;
if !worker_partitions.is_empty() {
let store = self.store.clone();
let si = params
.get_extra_param(SNAPSHOT_ID)
.map(|s| {
s.parse::<SnapshotId>()
.unwrap_or(DEFAULT_SNAPSHOT_ID)
})
.unwrap_or(DEFAULT_SNAPSHOT_ID);
let label_ids = encode_storage_labels(params.labels.as_ref())?;
let count =
store.count_all_vertices(si, label_ids.as_ref(), None, worker_partitions.as_ref());
Ok(count)
} else {
Ok(0)
}
}
}

fn count_edge(&self, params: &QueryParams) -> GraphProxyResult<u64> {
let worker_partitions = assign_worker_partitions(&self.server_partitions, &self.cluster_info)?;
if !worker_partitions.is_empty() {
let store = self.store.clone();
let si = params
.get_extra_param(SNAPSHOT_ID)
.map(|s| {
s.parse::<SnapshotId>()
.unwrap_or(DEFAULT_SNAPSHOT_ID)
})
.unwrap_or(DEFAULT_SNAPSHOT_ID);
let label_ids = encode_storage_labels(params.labels.as_ref())?;
let count = store.count_all_edges(si, label_ids.as_ref(), None, worker_partitions.as_ref());
Ok(count)
if params.filter.is_some() {
Ok(self.scan_edge(params)?.count() as u64)
} else {
Ok(0)
let worker_partitions = assign_worker_partitions(&self.server_partitions, &self.cluster_info)?;
if !worker_partitions.is_empty() {
let store = self.store.clone();
let si = params
.get_extra_param(SNAPSHOT_ID)
.map(|s| {
s.parse::<SnapshotId>()
.unwrap_or(DEFAULT_SNAPSHOT_ID)
})
.unwrap_or(DEFAULT_SNAPSHOT_ID);
let label_ids = encode_storage_labels(params.labels.as_ref())?;
let count = store.count_all_edges(si, label_ids.as_ref(), None, worker_partitions.as_ref());
Ok(count)
} else {
Ok(0)
}
}
}
}
Expand Down
Loading