Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into gie-grin
Browse files Browse the repository at this point in the history
Committed-by: bingqing.lbq from Dev container
  • Loading branch information
BingqingLyu committed Nov 21, 2023
2 parents aa82749 + bd9adcc commit a2388b5
Show file tree
Hide file tree
Showing 35 changed files with 153 additions and 54 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/docker/graphscope-store.Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
FROM ubuntu:22.04

ENV DEBIAN_FRONTEND=noninteractive

RUN apt-get update -y && \
apt-get install -y sudo default-jdk tzdata && \
apt-get clean -y && \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
############### RUNTIME: frontend && executor #######################
FROM ubuntu:22.04

ENV DEBIAN_FRONTEND=noninteractive

ADD artifacts/artifacts.tar.gz /opt/graphscope/

RUN apt-get update -y && \
Expand Down
10 changes: 9 additions & 1 deletion charts/graphscope-store/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -129,5 +129,13 @@ data:
export LOG_NAME=graphscope-store
export GROOT_CONF_FILE=/etc/groot/groot.config
${GRAPHSCOPE_HOME}/groot/bin/store_ctl.sh start ${ROLE}
# For core and heap profiling
# ulimit -c unlimited
# sudo mkdir -p /apsara/cloud/data/corefile/ && sudo chown -R graphscope:graphscope /apsara/cloud/data/corefile/
# export _RJEM_MALLOC_CONF=prof:true,lg_prof_interval:32,lg_prof_sample:19
# export MALLOC_CONF=prof:true,lg_prof_interval:32
export RUST_BACKTRACE=1
${GRAPHSCOPE_HOME}/groot/bin/store_ctl.sh start ${ROLE} # || sleep infinity
{{- end -}}
2 changes: 2 additions & 0 deletions flex/resources/queries/examples/store_procedure/Q1.1.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Match (author:PERSON)<-[:HASCREATOR]-(msg1:POST|COMMENT)
Return count(author);
2 changes: 2 additions & 0 deletions flex/resources/queries/examples/store_procedure/Q1.2.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Match (author:PERSON)<-[:HASCREATOR]-(msg1:POST|COMMENT)<-[:REPLYOF]-(msg2:POST|COMMENT)
Return count(author);
3 changes: 3 additions & 0 deletions flex/resources/queries/examples/store_procedure/Q1.3.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Match (author:PERSON)<-[:HASCREATOR]-(msg1:POST|COMMENT)
where msg1.length > $len
Return count(author);
3 changes: 3 additions & 0 deletions flex/resources/queries/examples/store_procedure/Q1.4.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Match (author:PERSON)<-[:HASCREATOR]-(msg1:POST|COMMENT)<-[:REPLYOF]-(msg2:POST|COMMENT)
where msg2.length > $len
Return count(author);
3 changes: 3 additions & 0 deletions flex/resources/queries/examples/store_procedure/Q2.1.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Match (p1:PERSON)-[:KNOWS]->(p2:PERSON)
Where p1.id = $id1 and p2.id = $id2
Return count(p1);
3 changes: 3 additions & 0 deletions flex/resources/queries/examples/store_procedure/Q2.2.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Match (p1:PERSON)-[:KNOWS]->(p2:PERSON)-[:LIKES]->(comment:COMMENT)
Where p1.id = $id1 and p2.id = $id2 and comment.length > $len
Return count(p1);
3 changes: 3 additions & 0 deletions flex/resources/queries/examples/store_procedure/Q2.3.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Match (p1:PERSON)-[k:KNOWS]->(p2:PERSON)
Where k.creationDate > $date1 and k.creationDate < $date2
Return count(p2);
3 changes: 3 additions & 0 deletions flex/resources/queries/examples/store_procedure/Q2.4.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Match (p1:PERSON)-[k:KNOWS]->(p2:PERSON)-[:LIKES]->(comment:COMMENT)
Where k.creationDate > $date1 and k.creationDate < $date2
Return count(p1);
4 changes: 4 additions & 0 deletions flex/resources/queries/examples/store_procedure/Q3.1.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Match (message:COMMENT|POST)-[:HASCREATOR]->(person:PERSON),
(message:COMMENT|POST)-[:HASTAG]->(tag:TAG),
(person:PERSON) -[:HASINTEREST]->(tag:TAG)
Return count(person);
5 changes: 5 additions & 0 deletions flex/resources/queries/examples/store_procedure/Q3.2.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Match (person1:PERSON)-[:LIKES]->(message:COMMENT|POST),
(message:COMMENT|POST)-[:HASCREATOR]->(person2:PERSON),
(person1:PERSON)-[:ISLOCATEDIN]->(place:PLACE),
(person2:PERSON) -[:ISLOCATEDIN]->(place:PLACE)
Return count(person1);
5 changes: 5 additions & 0 deletions flex/resources/queries/examples/store_procedure/Q3.3.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Match (person1:PERSON)<-[:HASCREATOR]-(comment:COMMENT),
(comment:COMMENT)-[:REPLYOF]->(post:POST),
(post:POST)<-[:CONTAINEROF]-(forum:FORUM),
(forum:FORUM)-[:HASMEMBER]->(person2:PERSON)
Return count(person1);
7 changes: 7 additions & 0 deletions flex/resources/queries/examples/store_procedure/Q3.4.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Match (forum:FORUM)-[:CONTAINEROF]->(post:POST),
(forum:FORUM)-[:HASMEMBER]->(person1:PERSON),
(forum:FORUM)-[:HASMEMBER]->(person2:PERSON),
(person1:PERSON)-[:KNOWS]->(person2:PERSON),
(person1:PERSON)-[:LIKES]->(post:POST),
(person2:PERSON)-[:LIKES]->(post:POST)
Return count(person1);
4 changes: 4 additions & 0 deletions interactive_engine/executor/assembly/groot/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@ runtime = {path = "../../ir/runtime"}
graph_proxy = {path = "../../ir/graph_proxy", features = ["with_global_query"]}
log4rs = "1.2"
tokio = { version = "1.24", features = ["macros", "sync"] }
tikv-jemallocator = {version = "0.5", default_features=false, features = ["profiling", "disable_initial_exec_tls"] }

[features]
column_filter_push_down = []

[profile.dev]
# TODO(siyuan): re-enable debug assertions by addressing the reports for misaligned pointer dereferences https://github.com/rust-lang/rust/pull/98112/
debug-assertions = false

[profile.release]
debug = true
4 changes: 4 additions & 0 deletions interactive_engine/executor/assembly/groot/src/store/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ use crate::store::jna_response::JnaResponse;
pub type GraphHandle = *const c_void;
pub type PartitionGraphHandle = *const c_void;
pub type FfiPartitionGraph = WrapperPartitionGraph<GraphStore>;
use tikv_jemallocator::Jemalloc;

#[global_allocator]
static GLOBAL: Jemalloc = Jemalloc;

static INIT: Once = Once::new();

Expand Down
2 changes: 1 addition & 1 deletion interactive_engine/executor/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,6 @@ else
fi

if [ "$TARGET" = "groot" ]; then
strip ${STRIP_OPTION} $(pwd)/target/${MODE}/libgroot_ffi.${SUFFIX}
# strip ${STRIP_OPTION} $(pwd)/target/${MODE}/libgroot_ffi.${SUFFIX}
ln -sf $(pwd)/target/${MODE}/libgroot_ffi.${SUFFIX} $(pwd)/target/libgroot_ffi.${SUFFIX}
fi
4 changes: 3 additions & 1 deletion interactive_engine/executor/engine/pegasus/common/src/rc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ impl<T: std::fmt::Debug> std::fmt::Debug for RcPointer<T> {
impl<T: ?Sized> Drop for RcPointer<T> {
fn drop(&mut self) {
if self.count.fetch_sub(1, Ordering::SeqCst) == 1 {
unsafe { std::ptr::drop_in_place(self.ptr.as_ptr()) }
unsafe {
let _ = Box::<T>::from_raw(self.ptr.as_ptr());
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ impl From<std::io::Error> for StartupError {
#[derive(Debug)]
pub enum CancelError {
JobNotFoundError(u64),
CancelMapPoisonedError,
}

impl Display for CancelError {
Expand All @@ -351,6 +352,9 @@ impl Display for CancelError {
CancelError::JobNotFoundError(e) => {
write!(f, "fail to find job, job id: {};", e)
}
CancelError::CancelMapPoisonedError => {
write!(f, "JOB_CANCEL_MAP is poisoned!;")
}
}
}
}
Expand Down
20 changes: 16 additions & 4 deletions interactive_engine/executor/engine/pegasus/pegasus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,11 +299,23 @@ where
}

pub fn cancel_job(job_id: u64) -> Result<(), CancelError> {
let mut hook = JOB_CANCEL_MAP.write().expect("lock poisoned");
if let Some(cancel_hook) = hook.get_mut(&job_id) {
cancel_hook.store(true, Ordering::SeqCst);
if let Ok(mut hook) = JOB_CANCEL_MAP.write() {
if let Some(cancel_hook) = hook.get_mut(&job_id) {
cancel_hook.store(true, Ordering::SeqCst);
} else {
return Err(CancelError::JobNotFoundError(job_id));
}
} else {
return Err(CancelError::CancelMapPoisonedError);
}
Ok(())
}

pub fn remove_cancel_hook(job_id: u64) -> Result<(), CancelError> {
if let Ok(mut hook) = JOB_CANCEL_MAP.write() {
hook.remove(&job_id);
} else {
return Err(CancelError::JobNotFoundError(job_id));
return Err(CancelError::CancelMapPoisonedError);
}
Ok(())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,12 @@ impl<D: Data, T: Debug + Send + 'static> Worker<D, T> {
}

fn release(&mut self) {
self.peer_guard.fetch_sub(1, Ordering::SeqCst);
if self.peer_guard.fetch_sub(1, Ordering::SeqCst) == 1 {
pegasus_memory::alloc::remove_task(self.conf.job_id as usize);
}
if !crate::remove_cancel_hook(self.conf.job_id).is_ok() {
error!("JOB_CANCEL_MAP is poisoned!");
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ where

async fn cancel(&self, req: Request<pb::CancelRequest>) -> Result<Response<Empty>, Status> {
let pb::CancelRequest { job_id } = req.into_inner();
pegasus::cancel_job(job_id);
let _ = pegasus::cancel_job(job_id);
Ok(Response::new(Empty {}))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@ use crate::db::api::{GraphError, GraphResult};
#[derive(Clone)]
pub struct UnsafeBytesReader<'a> {
buf: *const u8,
len: usize,
_phantom: PhantomData<&'a ()>,
}

impl<'a> UnsafeBytesReader<'a> {
pub fn new(buf: &[u8]) -> Self {
UnsafeBytesReader { buf: buf.as_ptr(), _phantom: Default::default() }
UnsafeBytesReader { buf: buf.as_ptr(), len: buf.len(), _phantom: Default::default() }
}

pub fn read_u8(&self, offset: usize) -> u8 {
Expand Down Expand Up @@ -55,6 +56,9 @@ impl<'a> UnsafeBytesReader<'a> {
pub fn read_bytes(&self, offset: usize, len: usize) -> &'a [u8] {
unsafe { ::std::slice::from_raw_parts(self.buf.offset(offset as isize), len) }
}
pub fn len(&self) -> usize {
self.len
}
}

/// This writer won't check whether the offset is overflow when write bytes.
Expand Down
11 changes: 11 additions & 0 deletions interactive_engine/executor/store/groot/src/db/graph/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,19 @@ impl Decoder {
if idx > self.src.fixed_len_prop_count {
start_off = bytes_to_len(reader.read_bytes(self.src.offsets[idx - 1], 3));
}
if end_off <= start_off {
error!("fatal error! This codec cannot decode the bytes: idx {}, end_off: {}, start_off: {}, props: {:?}, ", idx, end_off, start_off, self.src.props);
return None;
}
let len = end_off - start_off;
let start_off = start_off + self.src.var_len_prop_start_offset;
if start_off + len > reader.len() {
error!(
"fatal error! This codec cannot decode the bytes: idx {}, len: {}, props: {:?}, ",
idx, len, self.src.props
);
return None;
}
let bytes = reader.read_bytes(start_off, len);
let info = &self.src.props[idx];
let ret = ValueRef::new(info.r#type, bytes);
Expand Down
39 changes: 11 additions & 28 deletions interactive_engine/executor/store/groot/src/db/graph/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ impl MultiVersionGraph for GraphStore {
let info = res_unwrap!(self.vertex_manager.get_type(si, label), si, id, label)?;
match res_unwrap!(self.get_vertex_data(si, id, &info), insert_update_vertex, si, id, label)? {
Some(data) => {
let data = data.as_slice();
let version = get_codec_version(data);
let decoder = info.get_decoder(si, version)?;
let mut old = decoder.decode_all(data);
Expand All @@ -400,6 +401,7 @@ impl MultiVersionGraph for GraphStore {
self.check_si_guard(si)?;
let info = res_unwrap!(self.vertex_manager.get_type(si, label), si, id, label)?;
if let Some(data) = self.get_vertex_data(si, id, &info)? {
let data = data.as_slice();
let version = get_codec_version(data);
let decoder = info.get_decoder(si, version)?;
let mut old = decoder.decode_all(data);
Expand Down Expand Up @@ -457,43 +459,23 @@ impl MultiVersionGraph for GraphStore {
)?;
let direction = if forward { EdgeDirection::Out } else { EdgeDirection::In };

let mut data_res = self.get_edge_data(si, id, &info, direction)?;
let mut complete_id = id;
match data_res {
Some(_) => {
debug!("Edge exists");
}
None => {
// inner_id != 0 && no edge found using this edge id. it may be next edge id, or an useless edge id to be discarded.
let edge_id =
self.get_eid_by_vertex(si, edge_kind.edge_label_id, id.src_id, id.dst_id, forward);
match edge_id {
Some(edge_id) => {
complete_id = edge_id;
data_res = self.get_edge_data(si, edge_id, &info, direction)?;
}
None => {
// will create new edge
warn!("Edge doesn't exists, will create new edge");
}
}
}
}
let data_res = self.get_edge_data(si, id, &info, direction)?;

match data_res {
Some(data) => {
let data = data.as_slice();
let version = get_codec_version(data);
let decoder = info.get_decoder(si, version)?;
let mut old = decoder.decode_all(data);
merge_updates(&mut old, properties);
let res = self
.do_insert_edge_data(si, complete_id, info, direction, &old)
.do_insert_edge_data(si, id, info, direction, &old)
.map(|_| self.update_si_guard(si));
res_unwrap!(res, insert_update_edge, si, id, edge_kind)
}
None => {
let res = self
.do_insert_edge_data(si, complete_id, info, direction, properties)
.do_insert_edge_data(si, id, info, direction, properties)
.map(|_| self.update_si_guard(si));
res_unwrap!(res, insert_update_edge, si, id, edge_kind)
}
Expand Down Expand Up @@ -529,6 +511,7 @@ impl MultiVersionGraph for GraphStore {
)?;
let direction = if forward { EdgeDirection::Out } else { EdgeDirection::In };
if let Some(data) = self.get_edge_data(si, complete_id, &info, direction)? {
let data = data.as_slice();
let version = get_codec_version(data);
let decoder = info.get_decoder(si, version)?;
let mut old = decoder.decode_all(data);
Expand Down Expand Up @@ -675,14 +658,14 @@ impl GraphStore {

fn get_vertex_data(
&self, si: SnapshotId, id: VertexId, info: &VertexTypeInfoRef,
) -> GraphResult<Option<&[u8]>> {
) -> GraphResult<Option<Vec<u8>>> {
debug!("get_vertex_data");
if let Some(table) = info.get_table(si) {
let key = vertex_key(table.id, id, si - table.start_si);
let mut iter = self.storage.scan_from(&key)?;
if let Some((k, v)) = iter.next() {
if k.len() == key.len() && k[0..16] == key[0..16] && v.len() >= 4 {
let ret = unsafe { std::mem::transmute(v) };
let ret = v.to_vec();
return Ok(Some(ret));
}
}
Expand All @@ -692,15 +675,15 @@ impl GraphStore {

fn get_edge_data(
&self, si: SnapshotId, id: EdgeId, info: &EdgeKindInfoRef, direction: EdgeDirection,
) -> GraphResult<Option<&[u8]>> {
) -> GraphResult<Option<Vec<u8>>> {
debug!("get_edge_data");
if let Some(table) = info.get_table(si) {
let ts = si - table.start_si;
let key = edge_key(table.id, id, direction, ts);
let mut iter = self.storage.scan_from(&key)?;
if let Some((k, v)) = iter.next() {
if k.len() == key.len() && k[0..32] == key[0..32] && v.len() >= 4 {
let ret = unsafe { std::mem::transmute(v) };
let ret = v.to_vec();
return Ok(Some(ret));
}
}
Expand Down
16 changes: 15 additions & 1 deletion interactive_engine/executor/store/groot/src/db/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@ pub struct RawBytes {

impl RawBytes {
pub fn new(slice: &[u8]) -> Self {
RawBytes { ptr: slice.as_ptr(), len: slice.len() }
let tmp = slice.to_vec();
let ptr = tmp.as_ptr();
let len = tmp.len();
::std::mem::forget(tmp);
RawBytes { ptr, len }
}

pub fn empty() -> Self {
Expand All @@ -71,6 +75,16 @@ impl RawBytes {
}
}

impl Drop for RawBytes {
fn drop(&mut self) {
if self.len > 0 {
unsafe {
Vec::from_raw_parts(self.ptr as *mut u8, self.len, self.len);
}
}
}
}

impl<'a> Iterator for StorageIter<'a> {
type Item = KvPair;

Expand Down
Loading

0 comments on commit a2388b5

Please sign in to comment.