Skip to content

Commit

Permalink
fix #111: add reverse field for neighborWithTraversal
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhongFuze committed Dec 4, 2023
1 parent ee58223 commit c6b4521
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 11 deletions.
11 changes: 8 additions & 3 deletions src/bin/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use relation_server::{
config::C,
controller::tigergraphql::Query,
error::Result,
tigergraph::vertex::{ContractLoadFn, IdentityLoadFn, OwnerLoadFn},
tigergraph::vertex::{ContractLoadFn, IdentityLoadFn, NeighborReverseLoadFn, OwnerLoadFn},
util::make_http_client,
};
use std::{convert::Infallible, net::SocketAddr};
Expand Down Expand Up @@ -47,22 +47,27 @@ async fn main() -> Result<()> {
let owner_loader_fn = OwnerLoadFn {
client: client.to_owned(),
};

let neighbor_reverse_loader_fn = NeighborReverseLoadFn {
client: client.to_owned(),
};
let contract_loader = Loader::new(contract_loader_fn)
.with_max_batch_size(500)
.with_yield_count(100);
let identity_loader = Loader::new(identity_loader_fn)
.with_max_batch_size(500)
.with_yield_count(100);

let owner_loader = Loader::new(owner_loader_fn)
.with_max_batch_size(500)
.with_yield_count(100);
let neighbor_reverse_loader = Loader::new(neighbor_reverse_loader_fn)
.with_max_batch_size(500)
.with_yield_count(100);

let schema = Schema::build(Query::default(), EmptyMutation, EmptySubscription)
.data(contract_loader)
.data(identity_loader)
.data(owner_loader)
.data(neighbor_reverse_loader)
.finish();

let graphql_post = async_graphql_warp::graphql(schema)
Expand Down
22 changes: 17 additions & 5 deletions src/config/tdb/migrations/DBImportExport_IdentityGraph.gsql
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,23 @@ CREATE OR REPLACE QUERY owners_by_ids(SET<STRING> ids) FOR GRAPH IdentityGraph S
END;
}

CREATE OR REPLACE QUERY neighbor_reverse_by_ids(SET<STRING> ids) FOR GRAPH IdentityGraph SYNTAX V2 {
MapAccum<VERTEX<Identities>, BOOL> @@reverse_map;
// Init.
VertexSet (Identities) = SELECT s FROM Identities:s WHERE s.id IN ids
ACCUM @@reverse_map += (s -> false);
ListAccum<STRING> @@domainSystems = ["dotbit", "lens", "unstoppabledomains", "space_id", "crossbell"];

address = SELECT addr FROM VertexSet:domain-((<Reverse_Resolve):r)-Identities:addr
WHERE @@domainSystems.contains(r.system) == TRUE
ACCUM @@reverse_map += (domain -> true);

address2 = SELECT addr FROM VertexSet:addr-((Reverse_Resolve_Contract>):r)-Contracts:c
WHERE r.system == "ENS"
ACCUM @@reverse_map += (addr -> true);
PRINT @@reverse_map as reverse_map;
}


CREATE OR REPLACE QUERY identities_by_ids(SET<STRING> ids) FOR GRAPH IdentityGraph SYNTAX V2 {
vertices = SELECT s FROM Identities:s WHERE s.id IN ids;
Expand Down Expand Up @@ -321,11 +338,6 @@ CREATE OR REPLACE QUERY contracts_by_ids(SET<STRING> ids) FOR GRAPH IdentityGrap
PRINT vertices;
}

CREATE OR REPLACE QUERY owners_by_ids(SET<STRING> ids) FOR GRAPH IdentityGraph SYNTAX V2 {
vertices = SELECT s FROM Identities:s WHERE s.id IN ids;
PRINT vertices;
}

CREATE OR REPLACE QUERY expand(VERTEX<Identities> p, INT depth) FOR GRAPH IdentityGraph {
SetAccum<EDGE> @@edges;
OrAccum @visited = FALSE;
Expand Down
30 changes: 29 additions & 1 deletion src/controller/tigergraphql/identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use crate::{
tigergraph::{
delete_vertex_and_edge,
edge::{resolve::ResolveReverse, EdgeUnion, HoldRecord},
vertex::{Identity, IdentityRecord, IdentityWithSource, OwnerLoadFn},
vertex::{
Identity, IdentityRecord, IdentityWithSource, NeighborReverseLoadFn, OwnerLoadFn,
},
},
upstream::{fetch_all, ContractCategory, DataSource, Platform, Target},
util::make_http_client,
Expand Down Expand Up @@ -172,6 +174,31 @@ impl IdentityRecord {
self.resolve_reverse_domains(&client).await
}

/// reverse flag can be used as a filtering for Identity which type is domain system.
/// If `reverse=None` if omitted, there is no need to filter anything.
/// When `reverse=true`, just return `primary domain` related identities.
/// When `reverse=false`, Only `non-primary domain` will be returned, which is the inverse set of reverse=true.
async fn reverse(&self, ctx: &Context<'_>) -> Result<Option<bool>> {
if !vec![
Platform::Lens,
Platform::Dotbit,
Platform::UnstoppableDomains,
Platform::SpaceId,
Platform::Crossbell,
Platform::Ethereum, // ENS
]
.contains(&self.platform)
{
return Ok(None);
}
let loader: &Loader<String, Option<bool>, NeighborReverseLoadFn> =
ctx.data().map_err(|err| Error::GraphQLError(err.message))?;
match loader.load(self.v_id.clone()).await {
Some(value) => Ok(Some(value)),
None => Ok(None),
}
}

/// there's only `platform: lens, dotbit, unstoppabledomains, farcaster, space_id` identity `ownedBy` is not null
async fn owned_by(&self, ctx: &Context<'_>) -> Result<Option<IdentityRecord>> {
if !vec![
Expand All @@ -180,6 +207,7 @@ impl IdentityRecord {
Platform::UnstoppableDomains,
Platform::Farcaster,
Platform::SpaceId,
Platform::Crossbell,
]
.contains(&self.platform)
{
Expand Down
82 changes: 81 additions & 1 deletion src/tigergraph/vertex/identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -975,6 +975,10 @@ impl IdentityRecord {
}
}

pub struct NeighborReverseLoadFn {
pub client: Client<HttpConnector>,
}

pub struct OwnerLoadFn {
pub client: Client<HttpConnector>,
}
Expand Down Expand Up @@ -1013,6 +1017,18 @@ struct OwnerQueryIdResult {
identity: Vec<IdentityRecord>,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
struct NeighborReverseMapResponse {
#[serde(flatten)]
base: BaseResponse,
results: Option<Vec<NeighborReverseMapResult>>,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
struct NeighborReverseMapResult {
reverse_map: HashMap<String, Option<bool>>,
}

#[async_trait::async_trait]
impl BatchFn<String, Option<IdentityRecord>> for IdentityLoadFn {
async fn load(&mut self, ids: &[String]) -> HashMap<String, Option<IdentityRecord>> {
Expand All @@ -1039,6 +1055,71 @@ impl BatchFn<String, Option<IdentityRecord>> for OwnerLoadFn {
}
}

#[async_trait::async_trait]
impl BatchFn<String, Option<bool>> for NeighborReverseLoadFn {
async fn load(&mut self, ids: &[String]) -> HashMap<String, Option<bool>> {
tracing::info!("Loading ids for neighbor_reverse_by_ids {:?}", ids);
let records = get_neighbor_reverse_by_ids(&self.client, ids.to_vec()).await;
match records {
Ok(records) => records,
// HOLD ON: Not sure if `Err` need to return
Err(_) => ids.iter().map(|k| (k.to_owned(), None)).collect(),
}
}
}

async fn get_neighbor_reverse_by_ids(
client: &Client<HttpConnector>,
ids: Vec<String>,
) -> Result<HashMap<String, Option<bool>>, Error> {
let uri: http::Uri = format!(
"{}/query/{}/neighbor_reverse_by_ids",
C.tdb.host,
Graph::IdentityGraph.to_string()
)
.parse()
.map_err(|_err: InvalidUri| Error::ParamError(format!("Uri format Error {}", _err)))?;
let payload = VertexIds { ids };
let json_params = serde_json::to_string(&payload).map_err(|err| Error::JSONParseError(err))?;
let req = hyper::Request::builder()
.method(Method::POST)
.uri(uri)
.header("Authorization", Graph::IdentityGraph.token())
.body(Body::from(json_params))
.map_err(|_err| Error::ParamError(format!("ParamError Error {}", _err)))?;
let mut resp = client.request(req).await.map_err(|err| {
Error::ManualHttpClientError(format!(
"TigerGraph | Fail to request neighbor_reverse_by_ids: {:?}",
err.to_string()
))
})?;
match parse_body::<NeighborReverseMapResponse>(&mut resp).await {
Ok(r) => {
if r.base.error {
let err_message = format!(
"TigerGraph neighbor_reverse_by_ids error | Code: {:?}, Message: {:?}",
r.base.code, r.base.message
);
error!(err_message);
return Err(Error::General(err_message, resp.status()));
}
let result = r
.results
.and_then(|results| results.first().cloned())
.map_or(HashMap::new(), |res| res.reverse_map);
Ok(result)
}
Err(err) => {
let err_message = format!(
"TigerGraph neighbor_reverse_by_ids parse_body error: {:?}",
err
);
error!(err_message);
return Err(err);
}
}
}

async fn get_owners_by_ids(
client: &Client<HttpConnector>,
ids: Vec<String>,
Expand All @@ -1052,7 +1133,6 @@ async fn get_owners_by_ids(
.map_err(|_err: InvalidUri| Error::ParamError(format!("Uri format Error {}", _err)))?;
let payload = VertexIds { ids };
let json_params = serde_json::to_string(&payload).map_err(|err| Error::JSONParseError(err))?;
trace!("owners_by_ids {}", json_params);
let req = hyper::Request::builder()
.method(Method::POST)
.uri(uri)
Expand Down
3 changes: 2 additions & 1 deletion src/tigergraph/vertex/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ pub mod identity;
use async_trait::async_trait;
pub use contract::{Contract, ContractLoadFn, ContractRecord};
pub use identity::{
Identity, IdentityLoadFn, IdentityRecord, IdentityWithSource, NeighborsResponse, OwnerLoadFn,
Identity, IdentityLoadFn, IdentityRecord, IdentityWithSource, NeighborReverseLoadFn,
NeighborsResponse, OwnerLoadFn,
};
use serde::{Deserialize, Serialize};

Expand Down

0 comments on commit c6b4521

Please sign in to comment.