diff --git a/src/config/tdb/migrations/DBImportExport_IdentityGraph.gsql b/src/config/tdb/migrations/DBImportExport_IdentityGraph.gsql index 8d132693..b55253dd 100644 --- a/src/config/tdb/migrations/DBImportExport_IdentityGraph.gsql +++ b/src/config/tdb/migrations/DBImportExport_IdentityGraph.gsql @@ -197,6 +197,37 @@ CREATE OR REPLACE QUERY owners_by_ids(SET ids) FOR GRAPH IdentityGraph S END; } +CREATE OR REPLACE QUERY query_keybase_connections(STRING p) FOR GRAPH SocialGraph SYNTAX V2 { + SetAccum @@vlist; + SetAccum @@existing_vlist; + @@vlist += p; + @@existing_vlist = to_vertex_set(@@vlist, "Identities"); // check existence of a vertex without slowing server + seed (ANY) = {@@existing_vlist}; + ListAccum @@keybase_platform = ["twitter", "keybase", "github", "reddit", "mstdn.jp", "lobste.rs", "hackernews"]; + SetAccum @@vertices; + OrAccum @visited = FALSE; + + WHILE(seed.size()>0) LIMIT 2 DO + SetAccum @@pool; + proof = SELECT v FROM seed-((Proof_Forward>|||| ids) FOR GRAPH SocialGraph SYNTAX V2 { + MapAccum, DATETIME> @@expired_time_map; + VertexSet (Identities) = SELECT s FROM Identities:s WHERE s.id IN ids + ACCUM @@expired_time_map += (s -> to_datetime("1970-01-01 00:00:00")); + + address = SELECT addr FROM VertexSet:domain-(( h.expired_at); + PRINT @@expired_time_map as expired_time_map; +} + CREATE OR REPLACE QUERY neighbor_reverse_by_ids(SET ids) FOR GRAPH IdentityGraph SYNTAX V2 { MapAccum, BOOL> @@reverse_map; // Init. diff --git a/src/upstream/keybase/mod.rs b/src/upstream/keybase/mod.rs index bc42ce46..e110acf3 100644 --- a/src/upstream/keybase/mod.rs +++ b/src/upstream/keybase/mod.rs @@ -5,14 +5,16 @@ use crate::config::C; use crate::error::Error; use crate::tigergraph::create_identity_to_identity_proof_two_way_binding; use crate::tigergraph::edge::Proof; -use crate::tigergraph::vertex::Identity; +use crate::tigergraph::vertex::{Identity, IdentityRecord}; +use crate::tigergraph::{BaseResponse, Graph}; use crate::upstream::{DataSource, Fetcher, Platform, ProofLevel, TargetProcessedList}; use crate::util::{make_client, make_http_client, naive_now, parse_body, request_with_timeout}; use async_trait::async_trait; -use serde::Deserialize; - +use http::uri::InvalidUri; use hyper::{Body, Method}; +use serde::{Deserialize, Serialize}; use std::str::FromStr; +use tracing::error; use uuid::Uuid; use super::{DataFetcher, Target}; @@ -74,6 +76,18 @@ pub struct ErrorResponse { pub message: String, } +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct QueryKeybaseConnectionsResponse { + #[serde(flatten)] + base: BaseResponse, + results: Option>, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +struct KeybaseConnections { + vertices: Vec, +} + #[derive(Default)] pub struct Keybase {} @@ -86,7 +100,7 @@ impl Fetcher for Keybase { match target { Target::Identity(platform, identity) => { - fetch_connections_by_platform_identity(platform, identity).await + fake_fetch_connections_by_platform_identity(platform, identity).await } Target::NFT(_, _, _, _) => todo!(), } @@ -97,6 +111,83 @@ impl Fetcher for Keybase { } } +async fn fake_fetch_connections_by_platform_identity( + platform: &Platform, + identity: &str, +) -> Result { + let mut next_targets: TargetProcessedList = Vec::new(); + + let vid: String = format!("{},{}", platform.to_string(), identity.to_string()); + let encoded_vid = urlencoding::encode(&vid); + + let cli = make_http_client(); + let uri: http::Uri = format!( + "{}/query/{}/query_keybase_connections?p={}", + C.tdb.host, + Graph::SocialGraph.to_string(), + encoded_vid, + ) + .parse() + .map_err(|_err: InvalidUri| { + Error::ParamError(format!( + "QUERY query_keybase_connections?p={} Uri format Error | {}", + encoded_vid, _err + )) + })?; + let req = hyper::Request::builder() + .method(Method::GET) + .uri(uri) + .header("Authorization", Graph::SocialGraph.token()) + .body(Body::empty()) + .map_err(|_err| Error::ParamError(format!("ParamError Error | {}", _err)))?; + + let mut resp = cli.request(req).await.map_err(|err| { + Error::ManualHttpClientError(format!( + "query query_keybase_connections?p={} error | Fail to request: {:?}", + encoded_vid, + err.to_string() + )) + })?; + + let person_info = match parse_body::(&mut resp).await { + Ok(r) => { + if r.base.error { + let err_message = format!( + "TigerGraph query query_keybase_connections error | Code: {:?}, Message: {:?}", + r.base.code, r.base.message + ); + error!(err_message); + return Err(Error::General(err_message, resp.status())); + } + let result = r + .results + .and_then(|results| results.first().cloned()) + .map(|keybase_res| keybase_res.vertices) + .map_or(vec![], |res| { + res.into_iter() + .filter(|target| target.v_id() != vid) + .collect() + }); + result + } + Err(err) => { + let err_message = format!("TigerGraph query owned_by parse_body error: {:?}", err); + error!(err_message); + return Err(Error::General(err_message, resp.status())); + } + }; + + let _ = person_info.iter().map(|info| { + next_targets.push(Target::Identity( + info.platform.clone(), + info.identity.clone(), + )) + }); + + Ok(next_targets) +} + +#[allow(dead_code)] async fn fetch_connections_by_platform_identity( platform: &Platform, identity: &str,