Skip to content

Commit

Permalink
Merge branch 'main' into ir_query_manager
Browse files Browse the repository at this point in the history
  • Loading branch information
shirly121 authored Nov 23, 2023
2 parents e12b53e + 72b5cdc commit c4dce7d
Show file tree
Hide file tree
Showing 49 changed files with 325 additions and 89 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/docker/graphscope-store.Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
FROM ubuntu:22.04

ENV DEBIAN_FRONTEND=noninteractive

RUN apt-get update -y && \
apt-get install -y sudo default-jdk tzdata && \
apt-get clean -y && \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
############### RUNTIME: frontend && executor #######################
FROM ubuntu:22.04

ENV DEBIAN_FRONTEND=noninteractive

ADD artifacts/artifacts.tar.gz /opt/graphscope/

RUN apt-get update -y && \
Expand Down
10 changes: 9 additions & 1 deletion charts/graphscope-store/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -129,5 +129,13 @@ data:
export LOG_NAME=graphscope-store
export GROOT_CONF_FILE=/etc/groot/groot.config
${GRAPHSCOPE_HOME}/groot/bin/store_ctl.sh start ${ROLE}
# For core and heap profiling
# ulimit -c unlimited
# sudo mkdir -p /apsara/cloud/data/corefile/ && sudo chown -R graphscope:graphscope /apsara/cloud/data/corefile/
# export _RJEM_MALLOC_CONF=prof:true,lg_prof_interval:32,lg_prof_sample:19
# export MALLOC_CONF=prof:true,lg_prof_interval:32
export RUST_BACKTRACE=1
${GRAPHSCOPE_HOME}/groot/bin/store_ctl.sh start ${ROLE} # || sleep infinity
{{- end -}}
7 changes: 6 additions & 1 deletion flex/codegen/src/hqps_generator.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,12 @@ void build_fused_edge_get_v(
CHECK(vertex_labels.size() > 0);
edge_expand_op.set_expand_opt(
physical::EdgeExpand::ExpandOpt::EdgeExpand_ExpandOpt_VERTEX);
edge_expand_op.mutable_alias()->set_value(get_v_op.alias().value());
if (get_v_op.has_alias()) {
edge_expand_op.mutable_alias()->set_value(get_v_op.alias().value());
} else {
edge_expand_op.mutable_alias()->set_value(-1);
}

ss << _4_SPACES
<< BuildEdgeExpandOp<LabelT>(ctx, edge_expand_op, edge_meta_data,
vertex_labels)
Expand Down
28 changes: 18 additions & 10 deletions flex/engines/hqps_db/database/adj_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,10 @@ class SinglePropGetter {
using value_type = T;
static constexpr size_t prop_num = 1;
SinglePropGetter() {}
SinglePropGetter(std::shared_ptr<TypedRefColumn<T>> c) : column(c) {
CHECK(column.get() != nullptr);
}
SinglePropGetter(std::shared_ptr<TypedRefColumn<T>> c) : column(c) {}

inline value_type get_view(vid_t vid) const {
if (vid == NONE) {
if (vid == NONE || column == nullptr) {
return NullRecordCreator<value_type>::GetNull();
}
return column->get_view(vid);
Expand Down Expand Up @@ -149,15 +147,25 @@ class MultiPropGetter {
if (vid == NONE) {
return NullRecordCreator<result_tuple_t>::GetNull();
}
return get_view(vid, std::make_index_sequence<sizeof...(T)>());
result_tuple_t ret;
fill_result_tuple(ret, vid);
return ret;
}

template <size_t... Is>
inline result_tuple_t get_view(vid_t vid, std::index_sequence<Is...>) const {
if (vid == NONE) {
return NullRecordCreator<result_tuple_t>::GetNull();
template <size_t I = 0>
inline typename std::enable_if<I == sizeof...(T), void>::type
fill_result_tuple(result_tuple_t& ret, vid_t vid) const {}

template <size_t I = 0>
inline typename std::enable_if<(I < sizeof...(T)), void>::type
fill_result_tuple(result_tuple_t& ret, vid_t vid) const {
using cur_ele_t = typename std::tuple_element<I, result_tuple_t>::type;
if (std::get<I>(column) == nullptr) {
std::get<I>(ret) = NullRecordCreator<cur_ele_t>::GetNull();
} else {
std::get<I>(ret) = std::get<I>(column)->get_view(vid);
}
return std::make_tuple(std::get<Is>(column)->get_view(vid)...);
fill_result_tuple<I + 1>(ret, vid);
}

inline MultiPropGetter<T...>& operator=(const MultiPropGetter<T...>& d) {
Expand Down
2 changes: 2 additions & 0 deletions flex/resources/queries/examples/store_procedure/Q1.1.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Match (author:PERSON)<-[:HASCREATOR]-(msg1:POST|COMMENT)
Return count(author);
2 changes: 2 additions & 0 deletions flex/resources/queries/examples/store_procedure/Q1.2.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Match (author:PERSON)<-[:HASCREATOR]-(msg1:POST|COMMENT)<-[:REPLYOF]-(msg2:POST|COMMENT)
Return count(author);
3 changes: 3 additions & 0 deletions flex/resources/queries/examples/store_procedure/Q1.3.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Match (author:PERSON)<-[:HASCREATOR]-(msg1:POST|COMMENT)
where msg1.length > $len
Return count(author);
3 changes: 3 additions & 0 deletions flex/resources/queries/examples/store_procedure/Q1.4.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Match (author:PERSON)<-[:HASCREATOR]-(msg1:POST|COMMENT)<-[:REPLYOF]-(msg2:POST|COMMENT)
where msg2.length > $len
Return count(author);
3 changes: 3 additions & 0 deletions flex/resources/queries/examples/store_procedure/Q2.1.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Match (p1:PERSON)-[:KNOWS]->(p2:PERSON)
Where p1.id = $id1 and p2.id = $id2
Return count(p1);
3 changes: 3 additions & 0 deletions flex/resources/queries/examples/store_procedure/Q2.2.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Match (p1:PERSON)-[:KNOWS]->(p2:PERSON)-[:LIKES]->(comment:COMMENT)
Where p1.id = $id1 and p2.id = $id2 and comment.length > $len
Return count(p1);
3 changes: 3 additions & 0 deletions flex/resources/queries/examples/store_procedure/Q2.3.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Match (p1:PERSON)-[k:KNOWS]->(p2:PERSON)
Where k.creationDate > $date1 and k.creationDate < $date2
Return count(p2);
3 changes: 3 additions & 0 deletions flex/resources/queries/examples/store_procedure/Q2.4.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Match (p1:PERSON)-[k:KNOWS]->(p2:PERSON)-[:LIKES]->(comment:COMMENT)
Where k.creationDate > $date1 and k.creationDate < $date2
Return count(p1);
4 changes: 4 additions & 0 deletions flex/resources/queries/examples/store_procedure/Q3.1.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Match (message:COMMENT|POST)-[:HASCREATOR]->(person:PERSON),
(message:COMMENT|POST)-[:HASTAG]->(tag:TAG),
(person:PERSON) -[:HASINTEREST]->(tag:TAG)
Return count(person);
5 changes: 5 additions & 0 deletions flex/resources/queries/examples/store_procedure/Q3.2.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Match (person1:PERSON)-[:LIKES]->(message:COMMENT|POST),
(message:COMMENT|POST)-[:HASCREATOR]->(person2:PERSON),
(person1:PERSON)-[:ISLOCATEDIN]->(place:PLACE),
(person2:PERSON) -[:ISLOCATEDIN]->(place:PLACE)
Return count(person1);
5 changes: 5 additions & 0 deletions flex/resources/queries/examples/store_procedure/Q3.3.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Match (person1:PERSON)<-[:HASCREATOR]-(comment:COMMENT),
(comment:COMMENT)-[:REPLYOF]->(post:POST),
(post:POST)<-[:CONTAINEROF]-(forum:FORUM),
(forum:FORUM)-[:HASMEMBER]->(person2:PERSON)
Return count(person1);
7 changes: 7 additions & 0 deletions flex/resources/queries/examples/store_procedure/Q3.4.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Match (forum:FORUM)-[:CONTAINEROF]->(post:POST),
(forum:FORUM)-[:HASMEMBER]->(person1:PERSON),
(forum:FORUM)-[:HASMEMBER]->(person2:PERSON),
(person1:PERSON)-[:KNOWS]->(person2:PERSON),
(person1:PERSON)-[:LIKES]->(post:POST),
(person2:PERSON)-[:LIKES]->(post:POST)
Return count(person1);
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ public abstract class PatternQueryTest extends AbstractGremlinProcessTest {

public abstract Traversal<Vertex, Long> get_pattern_16_test();

public abstract Traversal<Vertex, Long> get_pattern_17_test();

@Test
public void run_pattern_1_test() {
Traversal<Vertex, Long> traversal = this.get_pattern_1_test();
Expand Down Expand Up @@ -170,6 +172,13 @@ public void run_pattern_16_test() {
Assert.assertEquals(23286L, traversal.next().longValue());
}

@Test
public void run_pattern_17_test() {
Traversal<Vertex, Long> traversal = this.get_pattern_17_test();
this.printTraversalForm(traversal);
Assert.assertEquals(17367L, traversal.next().longValue());
}

public static class Traversals extends PatternQueryTest {

// PM1
Expand Down Expand Up @@ -356,5 +365,26 @@ public Traversal<Vertex, Long> get_pattern_16_test() {
.by("firstName")
.count();
}

@Override
public Traversal<Vertex, Long> get_pattern_17_test() {
return g.V().match(
__.as("a")
.hasLabel("PERSON")
.out("HASINTEREST")
.hasLabel("TAG")
.as("b"),
__.as("a")
.hasLabel("PERSON")
.in("HASCREATOR")
.hasLabel("COMMENT", "POST")
.as("c"),
__.as("c")
.hasLabel("COMMENT", "POST")
.out("HASTAG")
.hasLabel("TAG")
.as("b"))
.count();
}
}
}
5 changes: 5 additions & 0 deletions interactive_engine/executor/assembly/groot/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@ runtime = {path = "../../ir/runtime"}
graph_proxy = {path = "../../ir/graph_proxy", features = ["with_global_query"]}
log4rs = "1.2"
tokio = { version = "1.24", features = ["macros", "sync"] }
tikv-jemallocator = {version = "0.5", default_features=false, features = ["profiling", "disable_initial_exec_tls"] }

[features]
column_filter_push_down = []

[profile.dev]
# TODO(siyuan): re-enable debug assertions by addressing the reports for misaligned pointer dereferences https://github.com/rust-lang/rust/pull/98112/
debug-assertions = false

[profile.release-with-debug-info]
inherits = "release"
debug = true
4 changes: 4 additions & 0 deletions interactive_engine/executor/assembly/groot/src/store/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ use crate::store::jna_response::JnaResponse;
pub type GraphHandle = *const c_void;
pub type PartitionGraphHandle = *const c_void;
pub type FfiPartitionGraph = WrapperPartitionGraph<GraphStore>;
use tikv_jemallocator::Jemalloc;

#[global_allocator]
static GLOBAL: Jemalloc = Jemalloc;

static INIT: Once = Once::new();

Expand Down
5 changes: 2 additions & 3 deletions interactive_engine/executor/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,10 @@ if [ "$MODE" = "debug" ]; then
elif [ "$MODE" = "release" ]; then
cargo build --release $append
else
echo "Invalid mode, choose from debug or release."
exit 1
cargo build --profile $MODE $append
fi

if [ "$TARGET" = "groot" ]; then
strip ${STRIP_OPTION} $(pwd)/target/${MODE}/libgroot_ffi.${SUFFIX}
# strip ${STRIP_OPTION} $(pwd)/target/${MODE}/libgroot_ffi.${SUFFIX}
ln -sf $(pwd)/target/${MODE}/libgroot_ffi.${SUFFIX} $(pwd)/target/libgroot_ffi.${SUFFIX}
fi
4 changes: 3 additions & 1 deletion interactive_engine/executor/engine/pegasus/common/src/rc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ impl<T: std::fmt::Debug> std::fmt::Debug for RcPointer<T> {
impl<T: ?Sized> Drop for RcPointer<T> {
fn drop(&mut self) {
if self.count.fetch_sub(1, Ordering::SeqCst) == 1 {
unsafe { std::ptr::drop_in_place(self.ptr.as_ptr()) }
unsafe {
let _ = Box::<T>::from_raw(self.ptr.as_ptr());
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ impl From<std::io::Error> for StartupError {
#[derive(Debug)]
pub enum CancelError {
JobNotFoundError(u64),
CancelMapPoisonedError,
}

impl Display for CancelError {
Expand All @@ -351,6 +352,9 @@ impl Display for CancelError {
CancelError::JobNotFoundError(e) => {
write!(f, "fail to find job, job id: {};", e)
}
CancelError::CancelMapPoisonedError => {
write!(f, "JOB_CANCEL_MAP is poisoned!;")
}
}
}
}
Expand Down
20 changes: 16 additions & 4 deletions interactive_engine/executor/engine/pegasus/pegasus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,11 +299,23 @@ where
}

pub fn cancel_job(job_id: u64) -> Result<(), CancelError> {
let mut hook = JOB_CANCEL_MAP.write().expect("lock poisoned");
if let Some(cancel_hook) = hook.get_mut(&job_id) {
cancel_hook.store(true, Ordering::SeqCst);
if let Ok(mut hook) = JOB_CANCEL_MAP.write() {
if let Some(cancel_hook) = hook.get_mut(&job_id) {
cancel_hook.store(true, Ordering::SeqCst);
} else {
return Err(CancelError::JobNotFoundError(job_id));
}
} else {
return Err(CancelError::CancelMapPoisonedError);
}
Ok(())
}

pub fn remove_cancel_hook(job_id: u64) -> Result<(), CancelError> {
if let Ok(mut hook) = JOB_CANCEL_MAP.write() {
hook.remove(&job_id);
} else {
return Err(CancelError::JobNotFoundError(job_id));
return Err(CancelError::CancelMapPoisonedError);
}
Ok(())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,12 @@ impl<D: Data, T: Debug + Send + 'static> Worker<D, T> {
}

fn release(&mut self) {
self.peer_guard.fetch_sub(1, Ordering::SeqCst);
if self.peer_guard.fetch_sub(1, Ordering::SeqCst) == 1 {
pegasus_memory::alloc::remove_task(self.conf.job_id as usize);
}
if !crate::remove_cancel_hook(self.conf.job_id).is_ok() {
error!("JOB_CANCEL_MAP is poisoned!");
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ where

async fn cancel(&self, req: Request<pb::CancelRequest>) -> Result<Response<Empty>, Status> {
let pb::CancelRequest { job_id } = req.into_inner();
pegasus::cancel_job(job_id);
let _ = pegasus::cancel_job(job_id);
Ok(Response::new(Empty {}))
}

Expand Down
25 changes: 18 additions & 7 deletions interactive_engine/executor/ir/common/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -687,13 +687,24 @@ impl From<(pb::EdgeExpand, pb::GetV)> for pb::path_expand::ExpandBase {
}

impl pb::QueryParams {
// is_queryable doesn't consider tables as we assume that the table info can be inferred directly from current data.
pub fn is_queryable(&self) -> bool {
!(self.predicate.is_none()
&& self.limit.is_none()
&& self.sample_ratio == 1.0
&& self.columns.is_empty()
&& !self.is_all_columns)
pub fn has_labels(&self) -> bool {
!self.tables.is_empty()
}

pub fn has_columns(&self) -> bool {
!self.columns.is_empty() || self.is_all_columns
}

pub fn has_predicates(&self) -> bool {
self.predicate.is_some()
}

pub fn has_sample(&self) -> bool {
self.sample_ratio != 1.0
}

pub fn has_limit(&self) -> bool {
self.limit.is_some()
}

pub fn is_empty(&self) -> bool {
Expand Down
8 changes: 7 additions & 1 deletion interactive_engine/executor/ir/core/src/plan/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1477,7 +1477,13 @@ fn is_whole_graph(operator: &pb::logical_plan::Operator) -> bool {
&& scan
.params
.as_ref()
.map(|params| !params.is_queryable() && is_params_all_labels(params))
.map(|params| {
!(params.has_columns()
|| params.has_predicates()
|| params.has_sample()
|| params.has_limit())
&& is_params_all_labels(params)
})
.unwrap_or(true)
}
pb::logical_plan::operator::Opr::Root(_) => true,
Expand Down
Loading

0 comments on commit c4dce7d

Please sign in to comment.