Skip to content

Commit

Permalink
Merge pull request #29 from Intersubjective/feature/separate_cluster_…
Browse files Browse the repository at this point in the history
…cache_per_context

Separate cluster cache per context
  • Loading branch information
automainint authored Nov 22, 2024
2 parents 665097c + ed474ab commit 3f90379
Show file tree
Hide file tree
Showing 7 changed files with 1,187 additions and 1,094 deletions.
2 changes: 1 addition & 1 deletion psql-connector/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pgmer2"
version = "0.3.20"
version = "0.3.21"
edition = "2021"

[[bin]]
Expand Down
121 changes: 72 additions & 49 deletions psql-connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ DROP FUNCTION IF EXISTS mr_scores;
DROP FUNCTION IF EXISTS mr_graph;
DROP FUNCTION IF EXISTS mr_mutual_scores;
DROP FUNCTION IF EXISTS mr_fetch_new_edges;
DROP FUNCTION IF EXISTS mr_put_edge;
DROP FUNCTION IF EXISTS mr_delete_edge;
DROP FUNCTION IF EXISTS mr_delete_node;
"#,
name = "bootstrap_raw",
bootstrap,
Expand Down Expand Up @@ -476,6 +479,7 @@ fn mr_put_edge(
dst: Option<&str>,
weight: Option<f64>,
context: default!(Option<&str>, "''"),
index: default!(Option<i32>, "-1"),
) -> Result<
TableIterator<
'static,
Expand All @@ -487,8 +491,9 @@ fn mr_put_edge(
let src = src.expect("src should not be null");
let dest = dst.expect("dst should not be null");
let weight = weight.expect("weight should not be null");
let index = index.expect("index should not be null");

let args = rmp_serde::to_vec(&(src, dest, weight))?;
let args = rmp_serde::to_vec(&(src, dest, weight, index))?;

let payload = encode_request(&Command {
id: CMD_PUT_EDGE.to_string(),
Expand All @@ -510,12 +515,14 @@ fn mr_delete_edge(
src: Option<&str>,
dst: Option<&str>,
context: default!(Option<&str>, "''"),
index: default!(Option<i32>, "-1"),
) -> Result<&'static str, Box<dyn Error + 'static>> {
let context = context.unwrap_or("");
let ego = src.expect("src should not be null");
let target = dst.expect("dst should not be null");
let dst = dst.expect("dst should not be null");
let index = index.expect("index should not be null");

let args = rmp_serde::to_vec(&(ego, target))?;
let args = rmp_serde::to_vec(&(ego, dst, index))?;

let payload = encode_request(&Command {
id: CMD_DELETE_EDGE.to_string(),
Expand All @@ -532,11 +539,13 @@ fn mr_delete_edge(
fn mr_delete_node(
src: Option<&str>,
context: default!(Option<&str>, "''"),
index: default!(Option<i32>, "-1"),
) -> Result<&'static str, Box<dyn Error + 'static>> {
let context = context.unwrap_or("");
let ego = src.expect("src should not be null");
let index = index.expect("index should not be null");

let args = rmp_serde::to_vec(&(ego))?;
let args = rmp_serde::to_vec(&(ego, index))?;

let payload = encode_request(&Command {
id: CMD_DELETE_NODE.to_string(),
Expand Down Expand Up @@ -701,11 +710,11 @@ mod tests {
for _ in 0..3000 {
let _ = crate::mr_reset().unwrap();
let _ =
crate::mr_put_edge(Some("U1"), Some("U2"), Some(2.0), None).unwrap();
crate::mr_put_edge(Some("U1"), Some("U2"), Some(2.0), None, Some(-1)).unwrap();
let _ =
crate::mr_put_edge(Some("U1"), Some("U3"), Some(1.0), None).unwrap();
crate::mr_put_edge(Some("U1"), Some("U3"), Some(1.0), None, Some(-1)).unwrap();
let _ =
crate::mr_put_edge(Some("U2"), Some("U3"), Some(3.0), None).unwrap();
crate::mr_put_edge(Some("U2"), Some("U3"), Some(3.0), None, Some(-1)).unwrap();
let _ = crate::mr_sync(Some(1000)).unwrap();
}
}
Expand Down Expand Up @@ -839,7 +848,7 @@ mod tests {
let _ = crate::mr_sync(Some(1000)).unwrap();

let res =
crate::mr_put_edge(Some("U1"), Some("U2"), Some(1.0), None).unwrap();
crate::mr_put_edge(Some("U1"), Some("U2"), Some(1.0), None, Some(-1)).unwrap();

let n = res
.map(|x| {
Expand All @@ -859,7 +868,7 @@ mod tests {
let _ = crate::mr_sync(Some(1000)).unwrap();

let res =
crate::mr_put_edge(Some("U1"), Some("U2"), Some(1.0), Some("X")).unwrap();
crate::mr_put_edge(Some("U1"), Some("U2"), Some(1.0), Some("X"), Some(-1)).unwrap();

let n = res
.map(|x| {
Expand All @@ -877,7 +886,7 @@ mod tests {
fn create_context() {
let _ = crate::mr_reset().unwrap();
let _ =
crate::mr_put_edge(Some("U1"), Some("U2"), Some(1.0), None).unwrap();
crate::mr_put_edge(Some("U1"), Some("U2"), Some(1.0), None, Some(-1)).unwrap();
let _ = crate::mr_create_context(Some("X"));
let _ = crate::mr_sync(Some(1000)).unwrap();

Expand All @@ -901,9 +910,9 @@ mod tests {
let _ = crate::mr_reset().unwrap();

let _ =
crate::mr_put_edge(Some("B1"), Some("B2"), Some(1.0), Some("X")).unwrap();
crate::mr_put_edge(Some("B1"), Some("B2"), Some(1.0), Some("X"), Some(-1)).unwrap();
let _ =
crate::mr_put_edge(Some("B1"), Some("B2"), Some(2.0), Some("Y")).unwrap();
crate::mr_put_edge(Some("B1"), Some("B2"), Some(2.0), Some("Y"), Some(-1)).unwrap();
let _ = crate::mr_sync(Some(1000)).unwrap();

let res = crate::mr_edgelist(None).unwrap();
Expand All @@ -926,10 +935,10 @@ mod tests {
let _ = crate::mr_reset().unwrap();

let _ =
crate::mr_put_edge(Some("B1"), Some("B2"), Some(1.0), Some("X")).unwrap();
crate::mr_put_edge(Some("B1"), Some("B2"), Some(1.0), Some("X"), Some(-1)).unwrap();
let _ =
crate::mr_put_edge(Some("B1"), Some("B2"), Some(2.0), Some("Y")).unwrap();
let _ = crate::mr_delete_edge(Some("B1"), Some("B2"), Some("X")).unwrap();
crate::mr_put_edge(Some("B1"), Some("B2"), Some(2.0), Some("Y"), Some(-1)).unwrap();
let _ = crate::mr_delete_edge(Some("B1"), Some("B2"), Some("X"), Some(-1)).unwrap();
let _ = crate::mr_sync(Some(1000)).unwrap();

// We should still have "Y" edge.
Expand All @@ -947,19 +956,33 @@ mod tests {
assert_eq!(n, 1);
}

#[pg_test]
fn delete_nodes() {
let _ = crate::mr_reset().unwrap();

let _ = crate::mr_put_edge(Some("B1"), Some("B2"), Some(1.0), None, Some(-1)).unwrap();
let _ = crate::mr_delete_node(Some("B1"), None, Some(-1)).unwrap();
let _ = crate::mr_delete_node(Some("B2"), None, Some(-1)).unwrap();
let _ = crate::mr_sync(Some(1000)).unwrap();

let res = crate::mr_edgelist(None).unwrap();

assert_eq!(res.count(), 0);
}

#[pg_test]
fn null_context_invariant() {
let _ = crate::mr_reset().unwrap();

let _ =
crate::mr_put_edge(Some("B1"), Some("B2"), Some(1.0), Some("X")).unwrap();
crate::mr_put_edge(Some("B1"), Some("B2"), Some(1.0), Some("X"), Some(-1)).unwrap();
let _ =
crate::mr_put_edge(Some("B1"), Some("B2"), Some(2.0), Some("Y")).unwrap();
crate::mr_put_edge(Some("B1"), Some("B2"), Some(2.0), Some("Y"), Some(-1)).unwrap();
let _ = crate::mr_sync(Some(1000)).unwrap();

// Delete and put back again.
let _ = crate::mr_delete_edge(Some("B1"), Some("B2"), Some("X"));
let _ = crate::mr_put_edge(Some("B1"), Some("B2"), Some(1.0), Some("X"));
let _ = crate::mr_delete_edge(Some("B1"), Some("B2"), Some("X"), Some(-1));
let _ = crate::mr_put_edge(Some("B1"), Some("B2"), Some(1.0), Some("X"), Some(-1));
let _ = crate::mr_sync(Some(1000)).unwrap();

let res = crate::mr_edgelist(None).unwrap();
Expand All @@ -981,11 +1004,11 @@ mod tests {
let _ = crate::mr_reset().unwrap();

let _ =
crate::mr_put_edge(Some("U1"), Some("U2"), Some(2.0), Some("X")).unwrap();
crate::mr_put_edge(Some("U1"), Some("U2"), Some(2.0), Some("X"), Some(-1)).unwrap();
let _ =
crate::mr_put_edge(Some("U1"), Some("U3"), Some(1.0), Some("X")).unwrap();
crate::mr_put_edge(Some("U1"), Some("U3"), Some(1.0), Some("X"), Some(-1)).unwrap();
let _ =
crate::mr_put_edge(Some("U3"), Some("U2"), Some(3.0), Some("X")).unwrap();
crate::mr_put_edge(Some("U3"), Some("U2"), Some(3.0), Some("X"), Some(-1)).unwrap();
let _ = crate::mr_sync(Some(1000)).unwrap();

let res = crate::mr_node_score(Some("U1"), Some("U2"), Some("X")).unwrap();
Expand All @@ -1010,11 +1033,11 @@ mod tests {
let _ = crate::mr_reset().unwrap();

let _ =
crate::mr_put_edge(Some("U1"), Some("U2"), Some(2.0), Some("")).unwrap();
crate::mr_put_edge(Some("U1"), Some("U2"), Some(2.0), Some(""), Some(-1)).unwrap();
let _ =
crate::mr_put_edge(Some("U1"), Some("U3"), Some(1.0), Some("")).unwrap();
crate::mr_put_edge(Some("U1"), Some("U3"), Some(1.0), Some(""), Some(-1)).unwrap();
let _ =
crate::mr_put_edge(Some("U2"), Some("U3"), Some(3.0), Some("")).unwrap();
crate::mr_put_edge(Some("U2"), Some("U3"), Some(3.0), Some(""), Some(-1)).unwrap();
let _ = crate::mr_sync(Some(1000)).unwrap();

let res: Vec<_> = crate::mr_scores(
Expand Down Expand Up @@ -1063,11 +1086,11 @@ mod tests {
let _ = crate::mr_reset().unwrap();

let _ =
crate::mr_put_edge(Some("U1"), Some("U2"), Some(2.0), Some("X")).unwrap();
crate::mr_put_edge(Some("U1"), Some("U2"), Some(2.0), Some("X"), Some(-1)).unwrap();
let _ =
crate::mr_put_edge(Some("U1"), Some("U3"), Some(1.0), Some("X")).unwrap();
crate::mr_put_edge(Some("U1"), Some("U3"), Some(1.0), Some("X"), Some(-1)).unwrap();
let _ =
crate::mr_put_edge(Some("U2"), Some("U3"), Some(3.0), Some("X")).unwrap();
crate::mr_put_edge(Some("U2"), Some("U3"), Some(3.0), Some("X"), Some(-1)).unwrap();
let _ = crate::mr_sync(Some(1000)).unwrap();

let res: Vec<_> = crate::mr_scores(
Expand Down Expand Up @@ -1116,11 +1139,11 @@ mod tests {
let _ = crate::mr_reset().unwrap();

let _ =
crate::mr_put_edge(Some("U1"), Some("U2"), Some(2.0), Some("X")).unwrap();
crate::mr_put_edge(Some("U1"), Some("U2"), Some(2.0), Some("X"), Some(-1)).unwrap();
let _ =
crate::mr_put_edge(Some("U1"), Some("U3"), Some(1.0), Some("X")).unwrap();
crate::mr_put_edge(Some("U1"), Some("U3"), Some(1.0), Some("X"), Some(-1)).unwrap();
let _ =
crate::mr_put_edge(Some("U2"), Some("U3"), Some(3.0), Some("X")).unwrap();
crate::mr_put_edge(Some("U2"), Some("U3"), Some(3.0), Some("X"), Some(-1)).unwrap();
let _ = crate::mr_sync(Some(1000)).unwrap();

let res: Vec<_> = crate::mr_scores(
Expand Down Expand Up @@ -1169,11 +1192,11 @@ mod tests {
let _ = crate::mr_reset().unwrap();

let _ =
crate::mr_put_edge(Some("U1"), Some("U2"), Some(2.0), None).unwrap();
crate::mr_put_edge(Some("U1"), Some("U2"), Some(2.0), None, Some(-1)).unwrap();
let _ =
crate::mr_put_edge(Some("U1"), Some("U3"), Some(1.0), None).unwrap();
crate::mr_put_edge(Some("U1"), Some("U3"), Some(1.0), None, Some(-1)).unwrap();
let _ =
crate::mr_put_edge(Some("U2"), Some("U3"), Some(3.0), None).unwrap();
crate::mr_put_edge(Some("U2"), Some("U3"), Some(3.0), None, Some(-1)).unwrap();
let _ = crate::mr_sync(Some(1000)).unwrap();

let res: Vec<_> = crate::mr_nodelist(None).unwrap().collect();
Expand All @@ -1190,11 +1213,11 @@ mod tests {
let _ = crate::mr_reset().unwrap();

let _ =
crate::mr_put_edge(Some("U1"), Some("U2"), Some(2.0), None).unwrap();
crate::mr_put_edge(Some("U1"), Some("U2"), Some(2.0), None, Some(-1)).unwrap();
let _ =
crate::mr_put_edge(Some("U1"), Some("U3"), Some(1.0), None).unwrap();
crate::mr_put_edge(Some("U1"), Some("U3"), Some(1.0), None, Some(-1)).unwrap();
let _ =
crate::mr_put_edge(Some("U2"), Some("U3"), Some(3.0), None).unwrap();
crate::mr_put_edge(Some("U2"), Some("U3"), Some(3.0), None, Some(-1)).unwrap();
let _ = crate::mr_sync(Some(1000)).unwrap();

let res: Vec<_> = crate::mr_connected(Some("U1"), None).unwrap().collect();
Expand All @@ -1212,17 +1235,17 @@ mod tests {
let _ = crate::mr_reset().unwrap();

let _ =
crate::mr_put_edge(Some("U1"), Some("U2"), Some(3.0), None).unwrap();
crate::mr_put_edge(Some("U1"), Some("U2"), Some(3.0), None, Some(-1)).unwrap();
let _ =
crate::mr_put_edge(Some("U1"), Some("U3"), Some(1.0), None).unwrap();
crate::mr_put_edge(Some("U1"), Some("U3"), Some(1.0), None, Some(-1)).unwrap();
let _ =
crate::mr_put_edge(Some("U2"), Some("U1"), Some(2.0), None).unwrap();
crate::mr_put_edge(Some("U2"), Some("U1"), Some(2.0), None, Some(-1)).unwrap();
let _ =
crate::mr_put_edge(Some("U2"), Some("U3"), Some(4.0), None).unwrap();
crate::mr_put_edge(Some("U2"), Some("U3"), Some(4.0), None, Some(-1)).unwrap();
let _ =
crate::mr_put_edge(Some("U3"), Some("U1"), Some(3.0), None).unwrap();
crate::mr_put_edge(Some("U3"), Some("U1"), Some(3.0), None, Some(-1)).unwrap();
let _ =
crate::mr_put_edge(Some("U3"), Some("U2"), Some(2.0), None).unwrap();
crate::mr_put_edge(Some("U3"), Some("U2"), Some(2.0), None, Some(-1)).unwrap();
let _ = crate::mr_sync(Some(1000)).unwrap();

let res: Vec<_> =
Expand Down Expand Up @@ -1277,7 +1300,7 @@ mod tests {
let _ = crate::mr_reset().unwrap();

let _ =
crate::mr_put_edge(Some("U1"), Some("U2"), Some(1.0), None).unwrap();
crate::mr_put_edge(Some("U1"), Some("U2"), Some(1.0), None, Some(-1)).unwrap();

assert_eq!(
crate::mr_fetch_new_edges(Some("U1"), Some("B"))
Expand All @@ -1287,9 +1310,9 @@ mod tests {
);

let _ =
crate::mr_put_edge(Some("U1"), Some("B3"), Some(2.0), None).unwrap();
crate::mr_put_edge(Some("U1"), Some("B3"), Some(2.0), None, Some(-1)).unwrap();
let _ =
crate::mr_put_edge(Some("U2"), Some("B4"), Some(3.0), None).unwrap();
crate::mr_put_edge(Some("U2"), Some("B4"), Some(3.0), None, Some(-1)).unwrap();
let _ = crate::mr_sync(Some(1000)).unwrap();

let res = crate::mr_fetch_new_edges(Some("U1"), Some("B")).unwrap();
Expand All @@ -1313,7 +1336,7 @@ mod tests {
let _ = crate::mr_reset().unwrap();

let _ =
crate::mr_put_edge(Some("U1"), Some("U2"), Some(1.0), None).unwrap();
crate::mr_put_edge(Some("U1"), Some("U2"), Some(1.0), None, Some(-1)).unwrap();

assert_eq!(
crate::mr_fetch_new_edges(Some("U1"), Some("B"))
Expand All @@ -1323,9 +1346,9 @@ mod tests {
);

let _ =
crate::mr_put_edge(Some("U1"), Some("B3"), Some(2.0), None).unwrap();
crate::mr_put_edge(Some("U1"), Some("B3"), Some(2.0), None, Some(-1)).unwrap();
let _ =
crate::mr_put_edge(Some("U2"), Some("B4"), Some(3.0), None).unwrap();
crate::mr_put_edge(Some("U2"), Some("B4"), Some(3.0), None, Some(-1)).unwrap();
let _ = crate::mr_sync(Some(1000)).unwrap();

let filter: Vec<u8> = crate::mr_get_new_edges_filter(Some("U1")).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion psql-connector/src/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ fn put_edge_(
dst: &str,
weight: f64,
) {
let _ = crate::mr_put_edge(Some(src), Some(dst), Some(weight), None).unwrap();
let _ = crate::mr_put_edge(Some(src), Some(dst), Some(weight), None, Some(-1)).unwrap();
}

pub fn put_testing_edges() {
Expand Down
2 changes: 1 addition & 1 deletion service/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "meritrank_service"
version = "0.2.27"
version = "0.2.28"
edition = "2021"

[features]
Expand Down
Loading

0 comments on commit 3f90379

Please sign in to comment.