Skip to content

Commit

Permalink
[GraphProxy] implement count_vertex() and count_edge() for GrinGraph
Browse files Browse the repository at this point in the history
Committed-by: bingqing.lbq from Dev container
  • Loading branch information
BingqingLyu committed Nov 20, 2023
1 parent 66d87bf commit a88c90f
Showing 1 changed file with 128 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1289,6 +1289,64 @@ impl GrinGraphProxy {
}
}

/// Count all vertices in the given partitions with specified vertex types.
/// Return the count
pub fn count_all_vertices(
&self, partitions: &Vec<GrinPartitionId>, label_ids: &Vec<GrinVertexTypeId>,
) -> GraphProxyResult<u64> {
let mut count = 0;
// TODO: GRIN needs to specify type ids to scan, while GIE-Gremlin doesn't support if to scan all types.
// For now, we query the types in GraphProxy.
// It should be specified in the `QueryParams` given in query plan later.
let vertex_type_ids =
if label_ids.is_empty() { self.schema.get_all_vertex_type_ids() } else { label_ids.clone() };
for partition in partitions {
if let Some(graph) = self.graphs.get(partition).cloned() {
for type_id in &vertex_type_ids {
unsafe {
count += grin_get_vertex_num_by_type(graph, *type_id);
}
}
} else {
return Err(GraphProxyError::QueryStoreError(format!(
"Partition {:?} is not found in the current process",
partition
)));
}
}

Ok(count as u64)
}

/// Count all edges in the given partitions with specified edge types.
/// Return the count
pub fn count_all_edges(
&self, partitions: &Vec<GrinPartitionId>, label_ids: &Vec<GrinEdgeTypeId>,
) -> GraphProxyResult<u64> {
let mut count = 0;
// TODO: GRIN needs to specify type ids to scan, while GIE-Gremlin doesn't support if to scan all types.
// For now, we query the types in GraphProxy.
// It should be specified in the `QueryParams` given in query plan later.
let vertex_type_ids =
if label_ids.is_empty() { self.schema.get_all_vertex_type_ids() } else { label_ids.clone() };
for partition in partitions {
if let Some(graph) = self.graphs.get(partition).cloned() {
for type_id in &vertex_type_ids {
unsafe {
count += grin_get_edge_num_by_type(graph, *type_id);
}
}
} else {
return Err(GraphProxyError::QueryStoreError(format!(
"Partition {:?} is not found in the current process",
partition
)));
}
}

Ok(count as u64)
}

pub fn get_vertex_by_id(&self, grin_graph: GrinGraph, vertex_id: i64) -> GraphProxyResult<GrinVertex> {
unsafe {
let vertex_ref = grin_deserialize_int64_to_vertex_ref(grin_graph, vertex_id);
Expand Down Expand Up @@ -1341,13 +1399,13 @@ impl GrinGraphRuntime {
GrinGraphRuntime { store, server_partitions, cluster_info }
}

fn assign_worker_partitions(&self) -> GraphProxyResult<Vec<PartitionId>> {
fn assign_worker_partitions(&self) -> GraphProxyResult<Vec<GrinPartitionId>> {
let workers_num = self.cluster_info.get_local_worker_num()?;
let worker_idx = self.cluster_info.get_worker_index()?;
let mut worker_partition_list = vec![];
for pid in &self.server_partitions {
if *pid % workers_num == worker_idx % workers_num {
worker_partition_list.push(*pid as PartitionId)
worker_partition_list.push(*pid as GrinPartitionId)
}
}
debug!(
Expand All @@ -1356,6 +1414,28 @@ impl GrinGraphRuntime {
);
Ok(worker_partition_list)
}

fn encode_storage_vlabel(&self, label: LabelId) -> GrinVertexTypeId {
label as GrinVertexTypeId
}

fn encode_storage_vlabels(&self, labels: &Vec<LabelId>) -> Vec<GrinVertexTypeId> {
labels
.iter()
.map(|label| self.encode_storage_vlabel(*label))
.collect()
}

fn encode_storage_elabel(&self, label: LabelId) -> GrinEdgeTypeId {
label as GrinEdgeTypeId
}

fn encode_storage_elabels(&self, labels: &Vec<LabelId>) -> Vec<GrinEdgeTypeId> {
labels
.iter()
.map(|label| self.encode_storage_elabel(*label))
.collect()
}
}

unsafe impl Send for GrinGraphRuntime {}
Expand All @@ -1368,19 +1448,11 @@ impl ReadGraph for GrinGraphRuntime {
let worker_partitions = self.assign_worker_partitions()?;
if !worker_partitions.is_empty() {
let store = self.store.clone();
let label_ids: Vec<GrinVertexTypeId> = params
.labels
.iter()
.map(|label| *label as GrinVertexTypeId)
.collect();
let label_ids = self.encode_storage_vlabels(&params.labels);
let row_filter = params.filter.clone();
let partitions: Vec<GrinPartitionId> = worker_partitions
.iter()
.map(|pid| *pid as GrinPartitionId)
.collect();
let columns = params.columns.clone();
let result_iter = store
.get_all_vertices(partitions.as_ref(), &label_ids)?
.get_all_vertices(worker_partitions.as_ref(), &label_ids)?
.map(move |graph_proxy| {
graph_proxy
.into_runtime_vertex(columns.as_ref())
Expand All @@ -1398,7 +1470,7 @@ impl ReadGraph for GrinGraphRuntime {
) -> GraphProxyResult<Option<RuntimeVertex>> {
// TODO(bingqing): confirm no duplicated vertex in the result
let store = self.store.clone();
let vertex_type_id = label as GrinVertexTypeId;
let vertex_type_id = self.encode_storage_vlabel(label);
let result = store
.get_vertex_by_index(vertex_type_id, primary_key)?
.map(move |vertex_proxy| vertex_proxy.into_runtime_vertex());
Expand All @@ -1413,19 +1485,11 @@ impl ReadGraph for GrinGraphRuntime {
let worker_partitions = self.assign_worker_partitions()?;
if !worker_partitions.is_empty() {
let store = self.store.clone();
let label_ids: Vec<GrinEdgeTypeId> = params
.labels
.iter()
.map(|label| *label as GrinEdgeTypeId)
.collect();
let label_ids = self.encode_storage_elabels(&params.labels);
let row_filter = params.filter.clone();
let partitions: Vec<GrinPartitionId> = worker_partitions
.iter()
.map(|pid| *pid as GrinPartitionId)
.collect();
let columns = params.columns.clone();
let result_iter = store
.get_all_edges(partitions.as_ref(), &label_ids)?
.get_all_edges(worker_partitions.as_ref(), &label_ids)?
.map(move |graph_proxy| {
graph_proxy
.into_runtime_edge(columns.as_ref(), true)
Expand Down Expand Up @@ -1475,11 +1539,7 @@ impl ReadGraph for GrinGraphRuntime {
) -> GraphProxyResult<Box<dyn Statement<ID, RuntimeVertex>>> {
let store = self.store.clone();
let row_filter = params.filter.clone();
let edge_type_ids: Vec<GrinEdgeTypeId> = params
.labels
.iter()
.map(|label| *label as GrinVertexTypeId)
.collect();
let edge_type_ids = self.encode_storage_elabels(&params.labels);
let stmt = from_fn(move |vertex_id: ID| match direction {
Direction::Out => {
let neighbor_vertex_iter = store
Expand Down Expand Up @@ -1513,11 +1573,7 @@ impl ReadGraph for GrinGraphRuntime {
) -> GraphProxyResult<Box<dyn Statement<ID, RuntimeEdge>>> {
let store = self.store.clone();
let row_filter = params.filter.clone();
let edge_type_ids: Vec<GrinEdgeTypeId> = params
.labels
.iter()
.map(|label| *label as GrinVertexTypeId)
.collect();
let edge_type_ids = self.encode_storage_elabels(&params.labels);
let columns = params.columns.clone();
let stmt = from_fn(move |vertex_id: ID| {
let columns = columns.clone();
Expand Down Expand Up @@ -1571,11 +1627,46 @@ impl ReadGraph for GrinGraphRuntime {
todo!()
}

fn count_vertex(&self, _params: &QueryParams) -> GraphProxyResult<u64> {
todo!()
fn count_vertex(&self, params: &QueryParams) -> GraphProxyResult<u64> {
if params.filter.is_some() {
// the filter can not be pushed down to store,
// so we need to scan all vertices with filter and then count
Ok(self.scan_vertex(params)?.count() as u64)
} else {
let worker_partitions = self.assign_worker_partitions()?;
if !worker_partitions.is_empty() {
let store = self.store.clone();
let label_ids: Vec<GrinVertexTypeId> = self.encode_storage_vlabels(&params.labels);
let count = store.count_all_vertices(&worker_partitions, &label_ids)?;
Ok(count)
} else {
Ok(0)
}
}
}

fn count_edge(&self, _params: &QueryParams) -> GraphProxyResult<u64> {
todo!()
fn count_edge(&self, params: &QueryParams) -> GraphProxyResult<u64> {
#[cfg(feature = "grin_enable_edge_list")]
{
if params.filter.is_some() {
// the filter can not be pushed down to store,
// so we need to scan all vertices with filter and then count
Ok(self.scan_edge(params)?.count() as u64)
} else {
let worker_partitions = self.assign_worker_partitions()?;
if !worker_partitions.is_empty() {
let store = self.store.clone();
let label_ids: Vec<GrinVertexTypeId> = self.encode_storage_vlabels(&params.labels);
let count = store.count_all_edges(&worker_partitions, &label_ids)?;
Ok(count)
} else {
Ok(0)
}
}
}
#[cfg(not(feature = "grin_enable_edge_list"))]
{
Err(GraphProxyError::QueryStoreError("Edge list is not enabled".to_string()))
}
}
}

0 comments on commit a88c90f

Please sign in to comment.