Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
andreivcodes committed Jan 31, 2025
1 parent 6d936d4 commit 8b15f87
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 40 deletions.
32 changes: 22 additions & 10 deletions apps/detective/indexers/snapshot_proposals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use async_trait::async_trait;
use chrono::{DateTime, NaiveDateTime, Utc};
use sea_orm::{
ActiveValue::{self, NotSet},
ColumnTrait, Condition, EntityTrait, QueryFilter, Set,
ColumnTrait, Condition, DatabaseConnection, EntityTrait, QueryFilter, Set,
};
use seaorm::{
dao, dao_indexer, proposal,
Expand Down Expand Up @@ -66,11 +66,15 @@ struct ProposalMetadata {

pub struct SnapshotProposalsIndexer {
api_handler: Arc<SnapshotApiHandler>,
db_handler: Option<Arc<DatabaseConnection>>,
}

impl SnapshotProposalsIndexer {
pub fn new(api_handler: Arc<SnapshotApiHandler>) -> Self {
Self { api_handler }
pub fn new(api_handler: Arc<SnapshotApiHandler>, db: Option<Arc<DatabaseConnection>>) -> Self {
Self {
api_handler,
db_handler: db,
}
}
}

Expand Down Expand Up @@ -183,7 +187,14 @@ impl ProposalsIndexer for SnapshotProposalsIndexer {
};

let proposals = if let Some(data) = graphql_response.data {
match parse_proposals(data.proposals, indexer, self.api_handler.clone()).await {
match parse_proposals(
data.proposals,
indexer,
self.api_handler.clone(),
self.db_handler.as_ref().unwrap().clone(),
)
.await
{
Ok(parsed_proposals) => parsed_proposals,
Err(e) => return Err(anyhow!("Failed to parse proposals: {}", e)),
}
Expand Down Expand Up @@ -244,6 +255,7 @@ async fn parse_proposals(
graphql_proposals: Vec<GraphQLProposal>,
indexer: &dao_indexer::Model,
snapshot_api: Arc<SnapshotApiHandler>,
db: Arc<DatabaseConnection>,
) -> Result<Vec<proposal::ActiveModel>> {
let mut proposals = vec![];

Expand Down Expand Up @@ -317,10 +329,11 @@ async fn parse_proposals(
proposals.push(proposal_model);

if p.privacy.as_str() == "shutter" && p.scores_state.as_str() == "final" {
if let Err(e) = refresh_shutter_votes(indexer.clone(), snapshot_api.clone(), p.id).await
let db_clone = db.clone();
if let Err(e) =
refresh_shutter_votes(indexer.clone(), snapshot_api.clone(), p.id, db_clone).await
{
tracing::error!("Failed to refresh shutter votes: {}", e);
// Optionally, continue processing other proposals or return an error
}
}
}
Expand Down Expand Up @@ -359,12 +372,11 @@ async fn refresh_shutter_votes(
indexer: dao_indexer::Model,
snapshot_api: Arc<SnapshotApiHandler>,
proposal_id: String,
db: Arc<DatabaseConnection>,
) -> Result<()> {
let db = DatabaseStore::connect().await?;

let dao = match dao::Entity::find()
.filter(dao::Column::Id.eq(indexer.dao_id))
.one(&db)
.one(&*db)
.await?
{
Some(dao) => dao,
Expand Down Expand Up @@ -622,7 +634,7 @@ mod snapshot_proposals_tests {
};

let snapshot_api_handler = Arc::new(SnapshotApiHandler::new(SnapshotApiConfig::default()));
let snapshot_indexer = SnapshotProposalsIndexer::new(snapshot_api_handler);
let snapshot_indexer = SnapshotProposalsIndexer::new(snapshot_api_handler, None);

match snapshot_indexer.process_proposals(&indexer, &dao).await {
Ok(ProcessResult::Proposals(proposals, _)) => {
Expand Down
22 changes: 12 additions & 10 deletions apps/detective/indexers/snapshot_votes.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use crate::{
database::DatabaseStore,
indexer::{Indexer, ProcessResult, VotesIndexer},
SnapshotApiHandler,
};
use anyhow::Result;
use async_trait::async_trait;
use chrono::DateTime;
use sea_orm::{
ActiveValue::NotSet, ColumnTrait, EntityTrait, QueryFilter, QueryOrder, QuerySelect, Set,
ActiveValue::NotSet, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QueryOrder,
QuerySelect, Set,
};
use seaorm::{dao, dao_indexer, proposal, sea_orm_active_enums::IndexerVariant, vote};
use serde::Deserialize;
Expand Down Expand Up @@ -43,11 +43,15 @@ struct GraphQLVote {

pub struct SnapshotVotesIndexer {
api_handler: Arc<SnapshotApiHandler>,
db_handler: Option<Arc<DatabaseConnection>>,
}

impl SnapshotVotesIndexer {
pub fn new(api_handler: Arc<SnapshotApiHandler>) -> Self {
Self { api_handler }
pub fn new(api_handler: Arc<SnapshotApiHandler>, db: Option<Arc<DatabaseConnection>>) -> Self {
Self {
api_handler,
db_handler: db,
}
}

pub fn proposal_indexer_variant() -> IndexerVariant {
Expand Down Expand Up @@ -85,8 +89,6 @@ impl VotesIndexer for SnapshotVotesIndexer {
) -> Result<ProcessResult> {
info!("Processing Snapshot Votes");

let db = DatabaseStore::connect().await?;

let proposal_limit = (indexer.speed / 10).max(10);

let proposals = proposal::Entity::find()
Expand All @@ -99,9 +101,9 @@ impl VotesIndexer for SnapshotVotesIndexer {
.inner_join(dao_indexer::Entity)
.filter(dao_indexer::Column::IndexerVariant.eq(IndexerVariant::SnapshotProposals))
.order_by(proposal::Column::EndAt, sea_orm::Order::Asc)
.limit(proposal_limit as u64) // hacky way to get around too many proposals in the query
.all(&db) // indexer.speed is used both for the number of votes to pull and the number of proposals to use
.await?; // if the query fails, they both decrease until the query works
.limit(proposal_limit as u64)
.all(self.db_handler.as_ref().unwrap().as_ref())
.await?;

let proposals_ext_ids: Vec<String> =
proposals.iter().map(|p| p.external_id.clone()).collect();
Expand Down Expand Up @@ -261,7 +263,7 @@ mod snapshot_votes_tests {
};

let snapshot_api_handler = Arc::new(SnapshotApiHandler::new(SnapshotApiConfig::default()));
let snapshot_indexer = SnapshotVotesIndexer::new(snapshot_api_handler);
let snapshot_indexer = SnapshotVotesIndexer::new(snapshot_api_handler, None);

match snapshot_indexer.process_votes(&indexer, &dao).await {
Ok(ProcessResult::Votes(votes, _)) => {
Expand Down
56 changes: 36 additions & 20 deletions apps/detective/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ async fn main() -> Result<()> {
let indexer_variant = indexer.indexer_variant.clone();

// Get the indexer implementation to access its refresh interval
let indexer_impl = get_indexer(&indexer_variant);
let indexer_impl = get_indexer(&indexer_variant, &db);
let interval = indexer_impl.refresh_interval();

// Check if enough time has passed since last processing
Expand Down Expand Up @@ -265,12 +265,11 @@ fn create_job_consumer(
"Processing indexer"
);

// Get the indexer implementation early
let indexer_implementation = get_indexer(&indexer.indexer_variant);
let indexer_implementation = get_indexer(&indexer.indexer_variant, &db);

let result = tokio::time::timeout(
indexer_implementation.timeout(),
process_job(&indexer, &dao),
process_job(&indexer, &dao, &db),
)
.await;

Expand Down Expand Up @@ -341,18 +340,22 @@ fn create_job_consumer(
})
}

#[instrument]
async fn process_job(indexer: &dao_indexer::Model, dao: &dao::Model) -> Result<ProcessResult> {
#[instrument(skip(db))]
async fn process_job(
indexer: &dao_indexer::Model,
dao: &dao::Model,
db: &DatabaseConnection,
) -> Result<ProcessResult> {
match indexer.indexer_type {
IndexerType::Proposals => {
if let Some(proposal_indexer) = get_proposals_indexer(&indexer.indexer_variant) {
if let Some(proposal_indexer) = get_proposals_indexer(&indexer.indexer_variant, db) {
proposal_indexer.process_proposals(indexer, dao).await
} else {
bail!("Unsupported proposals indexer variant")
}
}
IndexerType::Votes => {
if let Some(vote_indexer) = get_votes_indexer(&indexer.indexer_variant) {
if let Some(vote_indexer) = get_votes_indexer(&indexer.indexer_variant, db) {
vote_indexer.process_votes(indexer, dao).await
} else {
bail!("Unsupported votes indexer variant")
Expand Down Expand Up @@ -420,11 +423,15 @@ async fn store_process_results(
Ok(())
}

#[instrument]
fn get_proposals_indexer(indexer_variant: &IndexerVariant) -> Option<Box<dyn ProposalsIndexer>> {
#[instrument(skip(db))]
fn get_proposals_indexer(
indexer_variant: &IndexerVariant,
db: &DatabaseConnection,
) -> Option<Box<dyn ProposalsIndexer>> {
match indexer_variant {
IndexerVariant::SnapshotProposals => Some(Box::new(SnapshotProposalsIndexer::new(
SNAPSHOT_API_HANDLER.clone(),
Some(Arc::new(db.clone())),
))),
IndexerVariant::AaveV2MainnetProposals => Some(Box::new(AaveV2MainnetProposalsIndexer)),
IndexerVariant::AaveV3MainnetProposals => Some(Box::new(AaveV3MainnetProposalsIndexer)),
Expand Down Expand Up @@ -461,11 +468,15 @@ fn get_proposals_indexer(indexer_variant: &IndexerVariant) -> Option<Box<dyn Pro
}
}

#[instrument]
fn get_votes_indexer(indexer_variant: &IndexerVariant) -> Option<Box<dyn VotesIndexer>> {
#[instrument(skip(db))]
fn get_votes_indexer(
indexer_variant: &IndexerVariant,
db: &DatabaseConnection,
) -> Option<Box<dyn VotesIndexer>> {
match indexer_variant {
IndexerVariant::SnapshotVotes => Some(Box::new(SnapshotVotesIndexer::new(
SNAPSHOT_API_HANDLER.clone(),
Some(Arc::new(db.clone())),
))),
IndexerVariant::AaveV2MainnetVotes => Some(Box::new(AaveV2MainnetVotesIndexer)),
IndexerVariant::AaveV3MainnetVotes => Some(Box::new(AaveV3MainnetVotesIndexer)),
Expand Down Expand Up @@ -526,15 +537,20 @@ fn get_delegation_indexer(indexer_variant: &IndexerVariant) -> Option<Box<dyn De
}
}

#[instrument]
fn get_indexer(indexer_variant: &IndexerVariant) -> Box<dyn indexer::Indexer> {
#[instrument(skip(db))]
fn get_indexer(
indexer_variant: &IndexerVariant,
db: &DatabaseConnection,
) -> Box<dyn indexer::Indexer> {
match indexer_variant {
IndexerVariant::SnapshotProposals => {
Box::new(SnapshotProposalsIndexer::new(SNAPSHOT_API_HANDLER.clone()))
}
IndexerVariant::SnapshotVotes => {
Box::new(SnapshotVotesIndexer::new(SNAPSHOT_API_HANDLER.clone()))
}
IndexerVariant::SnapshotProposals => Box::new(SnapshotProposalsIndexer::new(
SNAPSHOT_API_HANDLER.clone(),
Some(Arc::new(db.clone())),
)),
IndexerVariant::SnapshotVotes => Box::new(SnapshotVotesIndexer::new(
SNAPSHOT_API_HANDLER.clone(),
Some(Arc::new(db.clone())),
)),
IndexerVariant::AaveV2MainnetProposals => Box::new(AaveV2MainnetProposalsIndexer),
IndexerVariant::AaveV2MainnetVotes => Box::new(AaveV2MainnetVotesIndexer),
IndexerVariant::AaveV3MainnetProposals => Box::new(AaveV3MainnetProposalsIndexer),
Expand Down

0 comments on commit 8b15f87

Please sign in to comment.