Skip to content

Commit

Permalink
Feature/social graph
Browse files Browse the repository at this point in the history
Merge pull request #134 from ZhongFuze/feature/social-graph
  • Loading branch information
ZhongFuze authored Apr 25, 2024
2 parents 39a3c17 + f542bd0 commit 28ca354
Show file tree
Hide file tree
Showing 11 changed files with 855 additions and 44 deletions.
79 changes: 40 additions & 39 deletions .github/workflows/container.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,50 +3,51 @@ name: Build container
on:
push:
branches:
- main
- develop
# DEBUG
- graphdb/tigergraph
- main
- develop
# DEBUG
- graphdb/tigergraph
- feature/social-graph

jobs:
build-image:
if: (github.ref_name == 'main' || github.ref_name == 'develop' || github.ref_name == 'graphdb/tigergraph') && github.repository == 'nextdotid/relation_server'
if: (github.ref_name == 'main' || github.ref_name == 'develop' || github.ref_name == 'feature/social-graph') && github.repository == 'nextdotid/relation_server'
runs-on: ubuntu-latest
env:
REGISTRY: ghcr.io
IMAGE_NAME: ${{ github.repository }}
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Log in to Container registry
uses: docker/login-action@v2
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Setup Docker BuildX
uses: docker/setup-buildx-action@v2
- name: Cache Docker layers
uses: actions/cache@v3
with:
path: /tmp/.buildx-cache
key: ${{ runner.os }}-docker-build-${{ hashFiles('**/Cargo.lock') }}
- name: Extract metadata
id: meta
uses: docker/metadata-action@v4
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
- name: Build and push
uses: docker/build-push-action@v3
with:
context: '.'
push: true
file: Dockerfile
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
cache-from: type=local,src=/tmp/.buildx-cache
cache-to: type=local,dest=/tmp/.buildx-cache-new,mode=max
- name: Move cache
run: |
rm -rf /tmp/.buildx-cache
mv /tmp/.buildx-cache-new /tmp/.buildx-cache
- name: Checkout
uses: actions/checkout@v3
- name: Log in to Container registry
uses: docker/login-action@v2
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Setup Docker BuildX
uses: docker/setup-buildx-action@v2
- name: Cache Docker layers
uses: actions/cache@v3
with:
path: /tmp/.buildx-cache
key: ${{ runner.os }}-docker-build-${{ hashFiles('**/Cargo.lock') }}
- name: Extract metadata
id: meta
uses: docker/metadata-action@v4
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
- name: Build and push
uses: docker/build-push-action@v3
with:
context: "."
push: true
file: Dockerfile
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
cache-from: type=local,src=/tmp/.buildx-cache
cache-to: type=local,dest=/tmp/.buildx-cache-new,mode=max
- name: Move cache
run: |
rm -rf /tmp/.buildx-cache
mv /tmp/.buildx-cache-new /tmp/.buildx-cache
171 changes: 171 additions & 0 deletions src/config/tdb/migrations/LoadingJob_SocialGraph.gsql
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,177 @@ CREATE OR REPLACE QUERY find_identity_graph(STRING p, INT reverse_flag=0) FOR GR
END;
}

CREATE OR REPLACE QUERY query_identity_graph_by_ids(SET<STRING> ids) FOR GRAPH SocialGraph {
TYPEDEF TUPLE< VERTEX source_v, VERTEX target_v, STRING data_source, STRING edge_type > IdentityConnection;
TYPEDEF TUPLE< STRING chain, STRING address > Address;

SetAccum<Address> @owner_address;
SetAccum<Address> @resolve_address;

SetAccum<IdentityConnection> @@edges;
SetAccum<VERTEX> @@hyper_vset;
@@hyper_vset = to_vertex_set(ids, "IdentitiesGraph");
hyper_vertices (IdentitiesGraph) = {@@hyper_vset};

FOREACH hv IN @@hyper_vset DO
@@edges.clear();
hyper_v = {hv};
vset = SELECT v FROM Identities:v-((PartOfIdentitiesGraph>):e)-hyper_v LIMIT 500;

tmp1 = SELECT v1 FROM vset:v1-((Proof_Forward>|Proof_Backward>):e1)-vset:v2
ACCUM @@edges += IdentityConnection(v1, v2, e1.source, "Proof");
tmp2 = SELECT v1 FROM vset:v1-((<Proof_Forward|<Proof_Backward):e2)-vset:v2
ACCUM @@edges += IdentityConnection(v2, v1, e2.source, "Proof");
tmp4 = SELECT v1 FROM vset:v1-((Hold_Identity>):e1)-vset:i-((Resolve>):e2)-vset:v2
WHERE i.platform != "ENS" AND i.platform != "ens" AND i.platform != "sns"
ACCUM @@edges += IdentityConnection(v1, i, e1.source, "Hold"),
i.@owner_address += Address(v1.platform, v1.identity),
i.@resolve_address += Address(v2.platform, v2.identity);
tmp5 = SELECT v1 FROM vset:v1-((Hold_Identity>):e1)-vset:v2
WHERE v2.platform != "ENS" AND v2.platform != "ens" AND v2.platform != "sns"
ACCUM @@edges += IdentityConnection(v1, v2, e1.source, "Hold"),
v2.@owner_address += Address(v1.platform, v1.identity);
tmp3 = SELECT v1 FROM vset:v1-((Resolve>):r)-vset:v2
WHERE v1.platform == "ENS" OR v1.platform == "ens" OR v1.platform == "sns"
ACCUM
@@edges += IdentityConnection(v1, v2, r.source, "Resolve"),
v1.@resolve_address += Address(v2.platform, v2.identity);
tmp3_1 = SELECT v1 FROM vset:v1-((Hold_Identity>):e1)-vset:v2
WHERE v2.platform == "ENS" OR v2.platform == "ens" OR v2.platform == "sns"
ACCUM v2.@owner_address += Address(v1.platform, v1.identity);

tmp6 = SELECT v1 FROM vset:v1-((Reverse_Resolve>):e1)-vset:v2
ACCUM @@edges += IdentityConnection(v1, v2, e1.source, "Reverse_Resolve");

PRINT hv as graph_id, vset as vertices, @@edges as edges;
END;
}


CREATE OR REPLACE QUERY social_follows(VERTEX<IdentitiesGraph> g, INT hop, SET<STRING> sources, INT numPerPage = 200, INT pageNum = 0) FOR GRAPH SocialGraph {
TYPEDEF TUPLE< VERTEX source_v, VERTEX target_v, STRING original_from, STRINg original_to, STRING data_source, STRING edge_type , STRING tag, DATETIME updated_at > Relation;

// TYPEDEF TUPLE< STRING chain, STRING address > Address;
// SetAccum<Address> @owner_address;
// SetAccum<Address> @resolve_address;

OrAccum @visited = FALSE;
SumAccum<INT> @degree;
SumAccum<INT> @range;
SumAccum<INT> @@all_count;

SetAccum<Relation> @@edges;
SetAccum<EDGE<Follow>> @@follow_edges;
SetAccum<VERTEX> @@vertices;

SetAccum<STRING> @@original_vid;
SetAccum<VERTEX> @@original_vset; // collect for original Identities vertex

seed (IdentitiesGraph) = {g};
seed = SELECT s FROM seed:s
ACCUM s.@visited += true,
s.@degree = s.outdegree("Follow");
@@vertices += g;

T = SELECT hv FROM seed-((Follow>|<Follow):e)-IdentitiesGraph:hv
PER (hv)
ACCUM @@all_count += 1;

WHILE(seed.size()>0) LIMIT hop DO
SetAccum<VERTEX> @@pool;
// mutual follow
INT size_0 = @@vertices.size();
INT index_start = numPerPage * pageNum;
INT index_end = numPerPage;
IF index_end - size_0 <= 0 THEN
BREAK;
END;

mutual_follow = SELECT hv FROM seed:s1-((Follow>):e1)-IdentitiesGraph:hv-((Follow>):e2)-seed:s2
WHERE hv.@visited == FALSE AND s1 == s2
POST-ACCUM
hv.@visited = TRUE,
hv.@degree = hv.outdegree("Follow")
ORDER BY hv.@degree DESC
LIMIT index_start, index_end - size_0;

tmp_1 = SELECT tgt FROM seed:s1-((Follow>):e1)-mutual_follow:tgt-((Follow>):e2)-seed:s2
WHERE s1 == s2 AND s1 != tgt
PER (s1, s2, e1, e2, tgt)
ACCUM
IF e1.source == e2.source THEN
@@edges += Relation(s1, tgt, e1.original_from, e1.original_to, e1.source, "following", "mutual_follow", e1.updated_at),
@@edges += Relation(tgt, s2, e2.original_from, e2.original_to, e2.source, "follower", "mutual_follow", e2.updated_at),
@@original_vid += e1.original_from,
@@original_vid += e1.original_to,
tgt.@range += 3,
@@pool += tgt,
@@vertices += tgt
END;

INT size_1 = @@vertices.size();
IF index_end - size_1 <= 0 THEN
BREAK;
END;

following = SELECT hv FROM seed:s1-((Follow>):e)-IdentitiesGraph:hv
WHERE hv.@visited == FALSE
POST-ACCUM
hv.@visited = TRUE,
hv.@degree = hv.outdegree("Follow")
ORDER BY hv.@degree DESC
LIMIT index_start, index_end - size_1;

tmp_2 = SELECT tgt FROM seed:s1-((Follow>):e)-following:tgt
PER (s1, e, tgt)
ACCUM
@@edges += Relation(s1, tgt, e.original_from, e.original_to, e.source, "following", "", e.updated_at),
@@original_vid += e.original_from,
@@original_vid += e.original_to,
tgt.@range += 2,
@@pool += tgt,
@@vertices += tgt;


INT size_2 = @@vertices.size();
IF index_end - size_2 <= 0 THEN
BREAK;
END;

follower = SELECT hv FROM seed:s1-((<Follow):e)-IdentitiesGraph:hv
WHERE hv.@visited == FALSE
POST-ACCUM
hv.@visited = TRUE,
hv.@degree = hv.outdegree("Follow")
ORDER BY hv.@degree DESC
LIMIT index_start, index_end - size_2;

tmp_3 = SELECT tgt FROM seed:s1-((<Follow):e)-follower:tgt
PER (s1, e, tgt)
ACCUM
@@edges += Relation(s1, tgt, e.original_from, e.original_to, e.source, "follower", "", e.updated_at),
@@original_vid += e.original_from,
@@original_vid += e.original_to,
tgt.@range += 1,
@@pool += tgt,
@@vertices += tgt;

seed (IdentitiesGraph) = {@@pool};
END;

@@original_vset = to_vertex_set(@@original_vid, "Identities");
original_vertices = { @@original_vset };
vertices = { @@vertices };

// tmp = SELECT domain FROM original_vertices:domain-((<Hold_Identity):e)-Identities:owner
// ACCUM domain.@owner_address += Address(owner.platform, owner.identity);
// tmp2 = SELECT domain FROM original_vertices:domain-((Resolve>):e)-Identities:tgt
// ACCUM domain.@resolve_address += Address(tgt.platform, tgt.identity);

PRINT @@all_count as all_count, @@edges as edges, vertices, original_vertices;
}


CREATE OR REPLACE QUERY neighbors(VERTEX<Identities> p, INT depth) FOR GRAPH SocialGraph {
SetAccum<EDGE> @@edges;
SetAccum<VERTEX> @@vertices;
Expand Down
29 changes: 29 additions & 0 deletions src/config/tdb/migrations/run_loading_jobs.gsql
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,32 @@ CREATE LOADING JOB Load_DBExport FOR GRAPH IdentityGraph {
LOAD "/home/tigergraph/shared_data/export_graphs/GlobalTypes/Resolve_Contract.csv" TO EDGE Resolve_Contract VALUES($"from", $"to", $"source", $"system", $"name", $"uuid", $"updated_at", $"fetcher") USING SEPARATOR="\t", HEADER="true", EOL="\n";
LOAD "/home/tigergraph/shared_data/export_graphs/GlobalTypes/Reverse_Resolve_Contract.csv" TO EDGE Reverse_Resolve_Contract VALUES($"from", $"to", $"source", $"system", $"name", $"uuid", $"updated_at", $"fetcher") USING SEPARATOR="\t", HEADER="true", EOL="\n";
}


CREATE LOADING JOB Load_LensSocialFeed FOR GRAPH SocialGraph {
// ethereum identity and lens identity already have been loading in call `upsert_hyper_vertex()`
// from to source uuid id updated_at fetcher
LOAD "/home/tigergraph/shared_data/lens_v2_social_feeds/lens.hold.tsv"
TO EDGE Hold_Identity VALUES ($"from", $"to", $"source", REDUCE(ignore_if_exists($"uuid")), _, $"id", _, REDUCE(max($"updated_at")), $"fetcher", _) USING SEPARATOR = "\t", EOL = "\n", HEADER = "true";
// from to source system name uuid updated_at fetcher
LOAD "/home/tigergraph/shared_data/lens_v2_social_feeds/lens.resolve.tsv"
TO EDGE Resolve VALUES ($"from", $"to", $"source", $"system", $"name", REDUCE(ignore_if_exists($"uuid")), REDUCE(max($"updated_at")), $"fetcher") USING SEPARATOR = "\t", EOL = "\n", HEADER = "true";
// from to source system name uuid updated_at fetcher
LOAD "/home/tigergraph/shared_data/lens_v2_social_feeds/lens.reverse_resolve.tsv"
TO EDGE Reverse_Resolve VALUES ($"from", $"to", $"source", $"system", $"name", REDUCE(ignore_if_exists($"uuid")), REDUCE(max($"updated_at")), $"fetcher") USING SEPARATOR = "\t", EOL = "\n", HEADER = "true";
// from to original_from original_to source updated_at
LOAD "/home/tigergraph/shared_data/lens_v2_social_feeds/lens.follow.tsv"
TO EDGE Follow VALUES ($"from", $"to", $"original_from", $"original_to", $"source", REDUCE(max($"updated_at"))) USING SEPARATOR = "\t", EOL = "\n", HEADER = "true";
}

RUN LOADING JOB Load_LensSocialFeed


CREATE LOADING JOB Load_KeybaseSocialFeed FOR GRAPH SocialGraph {
LOAD "/home/tigergraph/shared_data/keybase_social_feeds/keybase.proof.tsv"
TO EDGE Proof_Forward VALUES ($"from", $"to", $"source", $"created_at", $"uuid", $"level", $"record_id", $"updated_at", $"fetcher") USING SEPARATOR = "\t", EOL = "\n", HEADER = "true";
LOAD "/home/tigergraph/shared_data/keybase_social_feeds/keybase.follow.tsv"
TO EDGE Follow VALUES ($"from", $"to", $"original_from", $"original_to", $"source", REDUCE(max($"updated_at"))) USING SEPARATOR = "\t", EOL = "\n", HEADER = "true";
}

RUN LOADING JOB Load_KeybaseSocialFeed
40 changes: 37 additions & 3 deletions src/controller/tigergraphql/identity_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,55 @@ use crate::{
tigergraph::{
edge::{EdgeUnion, HoldRecord},
vertex::{
Address, ExpandIdentityRecord, IdentityConnection, IdentityGraph, IdentityRecord,
IdentityWithSource, OwnerLoadFn,
query_identity_graph_by_ids, Address, ExpandIdentityRecord, IdentityConnection,
IdentityGraph, IdentityRecord, IdentityWithSource, OwnerLoadFn,
},
},
upstream::{fetch_all, Chain, ContractCategory, DataSource, Platform, Target},
util::make_http_client,
};
use async_graphql::{Context, Object};
use async_graphql::{Context, InputObject, Object};
use dataloader::non_cached::Loader;
use serde::{Deserialize, Serialize};
use tracing::{event, Level};
use uuid::Uuid;

#[derive(InputObject, Debug, Clone, Serialize, Deserialize)]
pub struct IdentityFilter {
identity: String,
platform: String,
}

#[derive(InputObject, Clone, Debug, Serialize, Deserialize)]
pub struct IdentityGraphFilter {
by_graph_id: Option<Vec<String>>,
by_identity_platform: Option<Vec<IdentityFilter>>,
by_identity_id: Option<Vec<String>>,
}

#[derive(Default)]
pub struct IdentityGraphQuery;

#[Object]
impl IdentityGraphQuery {
async fn identity_graph(
&self,
_ctx: &Context<'_>,
filter: IdentityGraphFilter,
) -> Result<Vec<IdentityGraph>> {
if let Some(graph_ids) = filter.by_graph_id {
let client = make_http_client();
Ok(query_identity_graph_by_ids(&client, graph_ids).await?)
} else if let Some(_identity_filters) = filter.by_identity_platform {
Ok(vec![])
} else if let Some(_identity_filters) = filter.by_identity_id {
Ok(vec![])
} else {
Err(Error::ParamMissing("Must use filter to query".to_string()))
}
}
}

#[Object]
impl IdentityGraph {
/// Connecting a person’s different identifiers together, form an identity graph
Expand Down
7 changes: 6 additions & 1 deletion src/controller/tigergraphql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@ mod identity_graph;
mod proof;
mod relation;
mod resolve;
mod social;

use self::{hold::HoldQuery, identity::IdentityQuery, proof::ProofQuery, resolve::ResolveQuery};
use self::{
hold::HoldQuery, identity::IdentityQuery, proof::ProofQuery, resolve::ResolveQuery,
social::RelationQuery,
};
use async_graphql::{MergedObject, Object};
const API_VERSION: &str = "0.1";

Expand All @@ -18,6 +22,7 @@ pub struct Query(
ResolveQuery,
ProofQuery,
HoldQuery,
RelationQuery,
);

#[derive(Default)]
Expand Down
Loading

0 comments on commit 28ca354

Please sign in to comment.