Skip to content

Commit

Permalink
fix(interactive): Not requiring eid in update/delete edges in groot (#…
Browse files Browse the repository at this point in the history
…3240)

- Add API for writing vertices and edges in one batch
- Add eid to the Edge class in client
- Update an not existing edge will create a new edge.
- Update and Delete when eid is not given will try to search for it by
src and dst vertex first.
  • Loading branch information
siyuan0322 authored Sep 21, 2023
1 parent 3d6e176 commit 9f012b7
Show file tree
Hide file tree
Showing 13 changed files with 246 additions and 70 deletions.
13 changes: 7 additions & 6 deletions interactive_engine/assembly/src/conf/groot/logback.xml
Original file line number Diff line number Diff line change
@@ -1,30 +1,31 @@
<configuration scan="true" scanPeriod="30 seconds">
<define name="hostname" class="ch.qos.logback.core.property.CanonicalHostNamePropertyDefiner"/>
<property name="log_dir" value="${log.dir:-/var/log/graphscope}"/>
<property name="log_name" value="${log.name:-groot}"/>
<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log_dir}/${log_name}.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${log_dir}/${log_name}.%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<maxFileSize>100MB</maxFileSize>
<maxHistory>60</maxHistory>
<totalSizeCap>10GB</totalSizeCap>
<maxHistory>10</maxHistory>
<totalSizeCap>1GB</totalSizeCap>
</rollingPolicy>
<encoder>
<pattern>[%d{ISO8601}][%p][%t][%c:%L] %m%n</pattern>
<pattern>[%d{ISO8601}][%p][%t][${hostname}][%c:%L] %m%n</pattern>
</encoder>
</appender>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>[%d{ISO8601}][%p][%t][%c:%L] %m%n</pattern>
<pattern>[%d{ISO8601}][%p][%t][${hostname}][%c:%L] %m%n</pattern>
</encoder>
</appender>
<appender name="Metric" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log_dir}/metric.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${log_dir}/metric.log.%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<maxFileSize>100MB</maxFileSize>
<maxHistory>60</maxHistory>
<totalSizeCap>10GB</totalSizeCap>
<maxHistory>10</maxHistory>
<totalSizeCap>1GB</totalSizeCap>
</rollingPolicy>
<encoder>
<pattern>%m%n</pattern>
Expand Down
8 changes: 4 additions & 4 deletions interactive_engine/executor/assembly/groot/src/store/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ fn delete_vertex<G: MultiVersionGraph>(graph: &G, snapshot_id: i64, op: &Operati
}

fn overwrite_edge<G: MultiVersionGraph>(graph: &G, snapshot_id: i64, op: &OperationPb) -> GraphResult<()> {
trace!("overwrite_edge");
debug!("overwrite_edge");
let data_operation_pb = parse_pb::<DataOperationPb>(op.get_dataBytes())?;

let edge_id_pb = parse_pb::<EdgeIdPb>(data_operation_pb.get_keyBlob())?;
Expand All @@ -388,7 +388,7 @@ fn overwrite_edge<G: MultiVersionGraph>(graph: &G, snapshot_id: i64, op: &Operat
}

fn update_edge<G: MultiVersionGraph>(graph: &G, snapshot_id: i64, op: &OperationPb) -> GraphResult<()> {
trace!("update_edge");
debug!("update_edge");
let data_operation_pb = parse_pb::<DataOperationPb>(op.get_dataBytes())?;

let edge_id_pb = parse_pb::<EdgeIdPb>(data_operation_pb.get_keyBlob())?;
Expand All @@ -405,7 +405,7 @@ fn update_edge<G: MultiVersionGraph>(graph: &G, snapshot_id: i64, op: &Operation
fn clear_edge_properties<G: MultiVersionGraph>(
graph: &G, snapshot_id: i64, op: &OperationPb,
) -> GraphResult<()> {
trace!("update_edge");
debug!("clear_edge_properties");
let data_operation_pb = parse_pb::<DataOperationPb>(op.get_dataBytes())?;

let edge_id_pb = parse_pb::<EdgeIdPb>(data_operation_pb.get_keyBlob())?;
Expand All @@ -420,7 +420,7 @@ fn clear_edge_properties<G: MultiVersionGraph>(
}

fn delete_edge<G: MultiVersionGraph>(graph: &G, snapshot_id: i64, op: &OperationPb) -> GraphResult<()> {
trace!("delete_edge");
debug!("delete_edge");
let data_operation_pb = parse_pb::<DataOperationPb>(op.get_dataBytes())?;

let edge_id_pb = parse_pb::<EdgeIdPb>(data_operation_pb.get_keyBlob())?;
Expand Down
2 changes: 1 addition & 1 deletion interactive_engine/executor/common/dyn_type/src/serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ use std::any::TypeId;
use std::collections::BTreeMap;
use std::io;

use chrono::{Datelike, Timelike};
use pegasus_common::codec::{Decode, Encode, ReadExt, WriteExt};

use crate::{de_dyn_obj, DateTimeFormats, Object, Primitives};
use chrono::{Datelike, Timelike};

impl Encode for Primitives {
fn write_to<W: WriteExt>(&self, writer: &mut W) -> io::Result<()> {
Expand Down
5 changes: 2 additions & 3 deletions interactive_engine/executor/ir/core/src/plan/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@ use std::fmt;
use std::iter::FromIterator;
use std::rc::Rc;

use fraction::Fraction;
use ir_common::error::ParsePbError;
use ir_common::generated::algebra as pb;
use ir_common::generated::algebra::pattern::binder::Item;
use ir_common::generated::common as common_pb;
use ir_common::{KeyId, LabelId, NameOrId};

use fraction::Fraction;
use vec_map::VecMap;

use crate::error::{IrError, IrResult};
Expand Down Expand Up @@ -1580,7 +1579,7 @@ impl AsLogical for pb::Unfold {
let curr_node = plan_meta.get_curr_node();
let tag_id = get_or_set_tag_id(tag, plan_meta)?;
// plan_meta.set_tag_nodes(tag_id, plan_meta.get_curr_referred_nodes().to_vec());

let tag_nodes = plan_meta.get_tag_nodes(tag_id).to_vec();
plan_meta.refer_to_nodes(curr_node, tag_nodes.clone());
}
Expand Down
2 changes: 1 addition & 1 deletion interactive_engine/executor/store/groot/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
//! limitations under the License.
pub mod condition;
mod elem;
pub mod elem;
mod filter;
mod multi_version;
pub mod prelude;
Expand Down
5 changes: 3 additions & 2 deletions interactive_engine/executor/store/groot/src/db/graph/iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,13 @@ impl IntoIterator for EdgeKindScan {
{
Ok(decoder) => {
let edge_kind = self.edge_kind_info.get_type();
Some(Ok(RocksEdgeImpl::new(
let edge = RocksEdgeImpl::new(
edge_id.into(),
edge_kind.into(),
Some(decoder),
raw_val,
)))
);
Some(Ok(edge))
}
Err(e) => Some(Err(e.into())),
}
Expand Down
Loading

0 comments on commit 9f012b7

Please sign in to comment.