diff --git a/cli/src/main.rs b/cli/src/main.rs index a284b5b..d51e2b4 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -3,9 +3,7 @@ use futures::{stream, StreamExt, TryStreamExt}; use ipfs::IpfsClient; use sdk::{ids, pb::grc20}; use sink::{kg::{ - self, - entity::{Entity, EntityNode}, -}, ops::conversions}; + self, mapping::DefaultAttributes,}, ops::conversions}; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; @@ -46,21 +44,19 @@ async fn main() -> anyhow::Result<()> { unimplemented!() } Command::Describe { id } => { - let entity_node = kg_client - .find_node_by_id::(&id) + let entity = kg_client + .find_node_by_id::(&id) .await? .expect("Entity not found"); - let entity = Entity::from_entity(kg_client.clone(), entity_node.attributes().clone()); + println!("Entity: {}", entity.name_or_id()); - println!("Entity: {}", entity); - - let attributes = entity.attributes().await?; + let attributes = kg_client.attribute_nodes::(entity.id()).await?; for attribute in attributes { - println!("\tAttribute: {}", attribute); - if let Some(value_type) = attribute.value_type().await? { - println!("\t\tValue type: {}", value_type); + println!("\tAttribute: {}", attribute.name_or_id()); + if let Some(value_type) = kg_client.value_type_node::(attribute.id()).await? { + println!("\t\tValue type: {}", value_type.name_or_id()); } } } diff --git a/codegen/src/lib.rs b/codegen/src/lib.rs index 0e73fef..c98e173 100644 --- a/codegen/src/lib.rs +++ b/codegen/src/lib.rs @@ -164,7 +164,7 @@ pub async fn gen_type(kg: &sink::kg::Client, entity: &Node) -> anyhow::Re let typed_attrs = stream::iter(attrs.unique().fix_name_collisions()) .then(|attr| async move { - let value_type = kg.value_type_nodes(attr.id()).await?; + let value_type = kg.value_type_node(attr.id()).await?; Ok::<_, anyhow::Error>((attr, value_type)) }) .try_collect::>() diff --git a/sink/src/events/vote_cast.rs b/sink/src/events/vote_cast.rs index 1a9c3d3..bd4ea45 100644 --- a/sink/src/events/vote_cast.rs +++ b/sink/src/events/vote_cast.rs @@ -6,7 +6,7 @@ use sdk::{ }; use web3_utils::checksum_address; -use crate::{kg::mapping::Relation, neo4j_utils::Neo4jExt}; +use crate::kg::mapping::Relation; use super::{handler::HandlerError, EventHandler}; @@ -24,8 +24,8 @@ impl EventHandler { ) { // Space found (Ok(Some(space)), Ok(_)) | (Ok(None), Ok(Some(space))) => { - let proposal = self.kg.neo4j - .find_one::(neo4rs::query(&format!( + let proposal = self.kg + .find_node::(neo4rs::query(&format!( "MATCH (p:`{PROPOSAL_TYPE}` {{onchain_proposal_id: $onchain_proposal_id}})<-[:`{PROPOSALS}`]-(:`{INDEXED_SPACE}` {{id: $space_id}}) RETURN p", PROPOSAL_TYPE = system_ids::PROPOSAL_TYPE, PROPOSALS = system_ids::PROPOSALS, @@ -38,8 +38,7 @@ impl EventHandler { let account = self .kg - .neo4j - .find_one::( + .find_node::( neo4rs::query(&format!( "MATCH (a:`{ACCOUNT}` {{address: $address}}) RETURN a", ACCOUNT = system_ids::GEO_ACCOUNT, @@ -65,8 +64,8 @@ impl EventHandler { Relation::new( INDEXER_SPACE_ID, &vote_cast.id.clone(), - &account.id, - &proposal.id, + &account.id(), + &proposal.id(), system_ids::VOTE_CAST, vote_cast, ), diff --git a/sink/src/kg/client.rs b/sink/src/kg/client.rs index 22b895f..8eeba77 100644 --- a/sink/src/kg/client.rs +++ b/sink/src/kg/client.rs @@ -632,7 +632,7 @@ impl Client { self.find_nodes::(query).await } - pub async fn value_type_nodes Deserialize<'a> + Send>(&self, id: &str) -> Result>, DatabaseError> { + pub async fn value_type_node Deserialize<'a> + Send>(&self, id: &str) -> Result>, DatabaseError> { let query = neo4rs::query(&format!( r#" MATCH (a {{id: $id}}) -[:`{value_type_attr}`]-> (t:`{type_type}`) @@ -701,28 +701,3 @@ impl Client { Ok(()) } } - -#[derive(Debug, Deserialize)] -pub struct Entity { - pub id: String, - pub name: Option, - pub description: Option, - pub cover: Option, - pub content: Option, -} - -impl Entity { - pub fn new(id: &str, name: &str) -> Self { - Self { - id: id.to_string(), - name: Some(name.to_string()), - description: None, - cover: None, - content: None, - } - } - - pub fn find_by_id_query(id: &str) -> neo4rs::Query { - neo4rs::query("MATCH (n { id: $id }) RETURN n").param("id", id) - } -} diff --git a/sink/src/kg/entity.rs b/sink/src/kg/entity.rs deleted file mode 100644 index 40a6771..0000000 --- a/sink/src/kg/entity.rs +++ /dev/null @@ -1,76 +0,0 @@ -use std::fmt::Display; - -use sdk::system_ids; - -use crate::neo4j_utils::Neo4jExt; - -use super::Client; - -pub struct Entity { - kg: Client, - pub id: String, - pub name: String, -} - -impl Display for Entity { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{} ({})", self.name, self.id) - } -} - -impl Entity { - pub fn from_entity(kg: Client, entity_node: EntityNode) -> Self { - Self { - kg, - id: entity_node.id.clone(), - name: entity_node.name.unwrap_or(entity_node.id), - } - } - - pub async fn value_type(&self) -> anyhow::Result> { - let query = neo4rs::query(&format!( - r#" - MATCH (a {{id: $id}}) -[:`{value_type_attr}`]-> (t:`{type_type}`) - WHERE t.id IS NOT NULL AND t.`{name_attr}` IS NOT NULL - RETURN t - "#, - value_type_attr = system_ids::VALUE_TYPE, - type_type = system_ids::SCHEMA_TYPE, - name_attr = system_ids::NAME, - )) - .param("id", self.id.clone()); - - let type_node = self.kg.neo4j.find_one::(query).await?; - - Ok(type_node.map(|node| Self::from_entity(self.kg.clone(), node))) - } - - pub async fn attributes(&self) -> anyhow::Result> { - let query = neo4rs::query(&format!( - r#" - MATCH ({{id: $id}}) -[:`{attr_attr}`]-> (a:`{attr_type}`) - WHERE a.id IS NOT NULL AND a.`{name_attr}` IS NOT NULL - RETURN a - "#, - attr_attr = system_ids::ATTRIBUTES, - attr_type = system_ids::ATTRIBUTE, - name_attr = system_ids::NAME, - )) - .param("id", self.id.clone()); - - let attribute_nodes = self.kg.neo4j.find_all::(query).await?; - - Ok(attribute_nodes - .into_iter() - .map(|node| Entity::from_entity(self.kg.clone(), node)) - .collect::>()) - } -} - -#[derive(Clone, serde::Deserialize)] -pub struct EntityNode { - id: String, - #[serde(default, rename = "a126ca530c8e48d5b88882c734c38935")] - // TODO: Find a way to use system_ids constants - name: Option, -} diff --git a/sink/src/kg/mapping.rs b/sink/src/kg/mapping.rs index 28cce2e..5e14033 100644 --- a/sink/src/kg/mapping.rs +++ b/sink/src/kg/mapping.rs @@ -161,6 +161,10 @@ impl Node { .and_then(|v| v.as_str()) .map(|s| s.to_string()) } + + pub fn name_or_id(&self) -> String { + self.name().unwrap_or_else(|| self.id().to_string()) + } } pub type DefaultAttributes = HashMap; diff --git a/sink/src/kg/mod.rs b/sink/src/kg/mod.rs index 1a862e2..1be1fad 100644 --- a/sink/src/kg/mod.rs +++ b/sink/src/kg/mod.rs @@ -1,5 +1,4 @@ pub mod client; -pub mod entity; pub mod mapping; pub use client::Client; diff --git a/sink/src/neo4j_utils.rs b/sink/src/neo4j_utils.rs index bfe24d5..dbdb7fa 100644 --- a/sink/src/neo4j_utils.rs +++ b/sink/src/neo4j_utils.rs @@ -1,201 +1,4 @@ -use std::collections::HashMap; - -use futures::TryStreamExt; use neo4rs::BoltType; -use serde::{Deserialize, Serialize}; - -/// Extension methods for the Neo4j graph database. -pub trait Neo4jExt { - /// Find a single node or relationship from the given query and attempt to - /// deserialize it into the given type. - fn find_one Deserialize<'a> + Send>( - &self, - query: neo4rs::Query, - ) -> impl std::future::Future>> + Send; - - /// Find all nodes and/or relationships from the given query and attempt to - /// deserialize them into the given type. - /// Note: If the query returns both nodes and relations, neo4j will group the results - /// in tuples of (node, relation). For example: `MATCH (n) -[r]-> () RETURN n, r` will - /// return a list of `{"n": ..., "r": ...}` JSON objects. - fn find_all Deserialize<'a> + Send>( - &self, - query: neo4rs::Query, - ) -> impl std::future::Future>> + Send; - - fn insert_one( - &self, - node: T, - ) -> impl std::future::Future> + Send; - - fn insert_many( - &self, - node: Vec, - ) -> impl std::future::Future>; -} - -impl Neo4jExt for neo4rs::Graph { - async fn find_one Deserialize<'a>>( - &self, - query: neo4rs::Query, - ) -> anyhow::Result> { - Ok(self - .execute(query) - .await? - .next() - .await? - .map(|row| { - tracing::info!("Row: {:?}", row.to::()); - row.to() - }) - .transpose()?) - } - - async fn find_all Deserialize<'a>>( - &self, - query: neo4rs::Query, - ) -> anyhow::Result> { - Ok(self - .execute(query) - .await? - .into_stream_as::() - .try_collect::>() - .await?) - } - - async fn insert_one(&self, node: T) -> anyhow::Result<()> { - let json = serde_json::to_value(&node)?; - - let label = json.get("$type").and_then(|value| value.as_str()); - - let query = if let Some(label) = label { - neo4rs::query(&format!("CREATE (n:{label}) SET n = $node")) - } else { - neo4rs::query("CREATE (n) SET n = $node") - }; - - let query = query.param("node", serde_value_to_bolt(serde_json::to_value(&node)?)); - - self.run(query).await?; - - Ok(()) - } - - async fn insert_many(&self, node: Vec) -> anyhow::Result<()> { - let json = serde_json::to_value(&node)?; - - let label = json.get("$type").and_then(|value| value.as_str()); - - let query = if let Some(label) = label { - neo4rs::query(&format!( - "UNWIND $nodes AS node CREATE (n:{label}) SET n = node" - )) - } else { - neo4rs::query("UNWIND $nodes AS node CREATE (n) SET n = node") - }; - - let query = query.param("nodes", serde_value_to_bolt(json)); - - self.run(query).await?; - - Ok(()) - } -} - -/// Extension methods for Neo4j graph database transactions. -pub trait Neo4jMutExt { - /// Find a single node or relationship from the given query and attempt to - /// deserialize it into the given type. - fn find_one Deserialize<'a> + Send>( - &mut self, - query: neo4rs::Query, - ) -> impl std::future::Future>> + Send; - - /// Find all nodes and/or relationships from the given query and attempt to - /// deserialize them into the given type. - /// Note: If the query returns both nodes and relations, neo4j will group the results - /// in tuples of (node, relation). For example: `MATCH (n) -[r]-> () RETURN n, r` will - /// return a list of `{"n": ..., "r": ...}` JSON objects. - fn find_all Deserialize<'a> + Send>( - &mut self, - query: neo4rs::Query, - ) -> impl std::future::Future>> + Send; - - fn insert_one( - &mut self, - node: T, - ) -> impl std::future::Future> + Send; - - fn insert_many( - &mut self, - node: Vec, - ) -> impl std::future::Future>; -} - -impl Neo4jMutExt for neo4rs::Txn { - async fn find_one Deserialize<'a>>( - &mut self, - query: neo4rs::Query, - ) -> anyhow::Result> { - Ok(self - .execute(query) - .await? - .next(self.handle()) - .await? - .map(|row| row.to()) - .transpose()?) - } - - async fn find_all Deserialize<'a>>( - &mut self, - query: neo4rs::Query, - ) -> anyhow::Result> { - Ok(self - .execute(query) - .await? - .into_stream_as::(self) - .try_collect::>() - .await?) - } - - async fn insert_one(&mut self, node: T) -> anyhow::Result<()> { - let json = serde_json::to_value(&node)?; - - let label = json.get("$type").and_then(|value| value.as_str()); - - let query = if let Some(label) = label { - neo4rs::query(&format!("CREATE (n:{label}) SET n = $node")) - } else { - neo4rs::query("CREATE (n) SET n = $node") - }; - - let query = query.param("node", serde_value_to_bolt(serde_json::to_value(&node)?)); - - self.run(query).await?; - - Ok(()) - } - - async fn insert_many(&mut self, node: Vec) -> anyhow::Result<()> { - let json = serde_json::to_value(&node)?; - - let label = json.get("$type").and_then(|value| value.as_str()); - - let query = if let Some(label) = label { - neo4rs::query(&format!( - "UNWIND $nodes AS node CREATE (n:{label}) SET n = node" - )) - } else { - neo4rs::query("UNWIND $nodes AS node CREATE (n) SET n = node") - }; - - let query = query.param("nodes", serde_value_to_bolt(json)); - - self.run(query).await?; - - Ok(()) - } -} pub fn serde_value_to_bolt(value: serde_json::Value) -> BoltType { match value { diff --git a/sink/src/ops/delete_triple.rs b/sink/src/ops/delete_triple.rs index 7ae1869..1c26429 100644 --- a/sink/src/ops/delete_triple.rs +++ b/sink/src/ops/delete_triple.rs @@ -1,6 +1,4 @@ -use crate::kg::client::Entity; - -use crate::neo4j_utils::Neo4jExt; +use crate::kg::mapping::DefaultAttributes; use crate::ops::KgOp; pub struct DeleteTriple { @@ -11,10 +9,9 @@ pub struct DeleteTriple { impl KgOp for DeleteTriple { async fn apply_op(&self, kg: &crate::kg::client::Client, space_id: &str) -> anyhow::Result<()> { let entity_name = kg - .neo4j - .find_one::(Entity::find_by_id_query(&self.entity_id)) + .find_node_by_id::(&self.entity_id) .await? - .and_then(|entity| entity.name) + .and_then(|entity| entity.name()) .unwrap_or(self.entity_id.to_string()); let attribute_name = kg diff --git a/sink/src/ops/set_triple.rs b/sink/src/ops/set_triple.rs index 5d75347..dad2358 100644 --- a/sink/src/ops/set_triple.rs +++ b/sink/src/ops/set_triple.rs @@ -1,8 +1,6 @@ use sdk::system_ids; -use crate::kg::entity::EntityNode; - -use crate::ops::{KgOp, Value}; +use crate::{kg::mapping::DefaultAttributes, ops::{KgOp, Value}}; pub struct SetTriple { pub entity_id: String, @@ -40,7 +38,7 @@ impl KgOp for SetTriple { match (self.attribute_id.as_str(), &self.value) { (system_ids::TYPES, Value::Entity(value)) => { if kg - .find_relation_by_id::(&self.entity_id) + .find_relation_by_id::(&self.entity_id) .await? .is_some() { @@ -101,7 +99,7 @@ impl KgOp for SetTriple { } if kg - .find_relation_by_id::(&self.entity_id) + .find_relation_by_id::(&self.entity_id) .await? .is_some() { @@ -125,7 +123,7 @@ impl KgOp for SetTriple { } (attribute_id, value) => { if kg - .find_relation_by_id::(&self.entity_id) + .find_relation_by_id::(&self.entity_id) .await? .is_some() {