Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(ops): add ops crate for bundling misc tools. add a tree backfill…
Browse files Browse the repository at this point in the history
…er based on cl_audits_v2.
kespinola committed Jan 24, 2024
1 parent 62bb23c commit 7847939
Showing 17 changed files with 1,266 additions and 0 deletions.
93 changes: 93 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
[workspace]
resolver = "2"
members = [
"core",
"das_api",
"digital_asset_types",
"metaplex-rpc-proxy",
"migration",
"nft_ingester",
"ops",
"tools/acc_forwarder",
"tools/bgtask_creator",
"tools/fetch_trees",
@@ -34,6 +36,7 @@ cadence = "0.29.0"
cadence-macros = "0.29.0"
chrono = "0.4.19"
clap = "4.2.2"
das-core = { path = "core" }
digital_asset_types = { path = "digital_asset_types" }
enum-iterator = "1.2.0"
enum-iterator-derive = "1.1.0"
18 changes: 18 additions & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[package]
name = "das-core"
version.workspace = true
edition.workspace = true
repository.workspace = true
publish.workspace = true

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
clap = { workspace = true }
anyhow = { workspace = true }
sqlx = { workspace = true }
cadence = { workspace = true }
cadence-macros = { workspace = true }

[lints]
workspace = true
38 changes: 38 additions & 0 deletions core/src/db.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use anyhow::Result;
use clap::Parser;
use sqlx::{
postgres::{PgConnectOptions, PgPoolOptions},
PgPool,
};

#[derive(Debug, Parser, Clone)]
pub struct PoolArgs {
/// The database URL.
#[arg(long, env)]
pub database_url: String,
/// The maximum number of connections to the database.
#[arg(long, env, default_value = "125")]
pub database_max_connections: u32,
/// The minimum number of connections to the database.
#[arg(long, env, default_value = "5")]
pub database_min_connections: u32,
}

///// Establishes a connection to the database using the provided configuration.
/////
///// # Arguments
/////
///// * `config` - A `PoolArgs` struct containing the database URL and the minimum and maximum number of connections.
/////
///// # Returns
/////
///// * `Result<DatabaseConnection, DbErr>` - On success, returns a `DatabaseConnection`. On failure, returns a `DbErr`.
pub async fn connect_db(config: PoolArgs) -> Result<PgPool, sqlx::Error> {
let options: PgConnectOptions = config.database_url.parse()?;

PgPoolOptions::new()
.min_connections(config.database_min_connections)
.max_connections(config.database_max_connections)
.connect_with(options)
.await
}
5 changes: 5 additions & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
mod db;
mod metrics;

pub use db::*;
pub use metrics::*;
31 changes: 31 additions & 0 deletions core/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use anyhow::Result;
use cadence::{BufferedUdpMetricSink, QueuingMetricSink, StatsdClient};
use cadence_macros::set_global_default;
use clap::Parser;
use std::net::UdpSocket;

#[derive(Clone, Parser, Debug)]
pub struct MetricsArgs {
#[arg(long, env, default_value = "127.0.0.1")]
pub metrics_host: String,
#[arg(long, env, default_value = "8125")]
pub metrics_port: u16,
#[arg(long, env, default_value = "das.backfiller")]
pub metrics_prefix: String,
}

pub fn setup_metrics(config: MetricsArgs) -> Result<()> {
let host = (config.metrics_host, config.metrics_port);

let socket = UdpSocket::bind("0.0.0.0:0")?;
socket.set_nonblocking(true)?;

let udp_sink = BufferedUdpMetricSink::from(host, socket)?;
let queuing_sink = QueuingMetricSink::from(udp_sink);

let client = StatsdClient::from_sink(&config.metrics_prefix, queuing_sink);

set_global_default(client);

Ok(())
}
2 changes: 2 additions & 0 deletions migration/src/lib.rs
Original file line number Diff line number Diff line change
@@ -35,6 +35,7 @@ mod m20231206_120101_remove_was_decompressed;
mod m20240104_203133_add_cl_audits_v2;
mod m20240104_203328_remove_cl_audits;
mod m20240116_130744_add_update_metadata_ix;
mod m20240124_173104_add_tree_seq_index_to_cl_audits_v2;

pub mod model;

@@ -79,6 +80,7 @@ impl MigratorTrait for Migrator {
Box::new(m20240104_203133_add_cl_audits_v2::Migration),
Box::new(m20240104_203328_remove_cl_audits::Migration),
Box::new(m20240116_130744_add_update_metadata_ix::Migration),
Box::new(m20240124_173104_add_tree_seq_index_to_cl_audits_v2::Migration),
]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use super::model::table::ClAuditsV2;
use sea_orm::{ConnectionTrait, DatabaseBackend, Statement};
use sea_orm_migration::prelude::*;

#[derive(DeriveMigrationName)]
pub struct Migration;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
let conn = manager.get_connection();

conn.execute(Statement::from_string(
DatabaseBackend::Postgres,
"CREATE INDEX IF NOT EXISTS tree_seq_idx ON cl_audits_v2 (tree, seq);".to_string(),
))
.await?;

Ok(())
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_index(
Index::drop()
.name("tree_seq_idx")
.table(ClAuditsV2::Table)
.to_owned(),
)
.await?;

Ok(())
}
}
56 changes: 56 additions & 0 deletions ops/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
[package]
name = "das-ops"
version.workspace = true
edition.workspace = true
repository.workspace = true
publish.workspace = true

[[bin]]
name = "das-ops"

[dependencies]

clap = { workspace = true, features = ["derive", "cargo", "env"] }
das-core = { workspace = true }
backon = "0.4.1"
log = { workspace = true }
env_logger = { workspace = true }
anyhow = { workspace = true }
redis = { workspace = true }
futures = { workspace = true }
futures-util = { workspace = true }
base64 = { workspace = true }
indicatif = "0.17.5"
thiserror = { workspace = true }
serde_json = { workspace = true }
cadence = { workspace = true }
cadence-macros = { workspace = true }
anchor-client = { workspace = true }
tokio = { workspace = true }
sqlx = { workspace = true }
sea-orm = { workspace = true }
sea-query = { workspace = true }
chrono = { workspace = true }
tokio-postgres = { workspace = true }
serde = { workspace = true }
bs58 = { workspace = true }
reqwest = { workspace = true }
plerkle_messenger = { workspace = true }
plerkle_serialization = { workspace = true }
flatbuffers = { workspace = true }
lazy_static = { workspace = true }
regex = { workspace = true }
digital_asset_types = { workspace = true }
mpl-bubblegum = { workspace = true }
spl-account-compression = { workspace = true }
spl-concurrent-merkle-tree = { workspace = true }
uuid = { workspace = true }
figment = { workspace = true }
solana-sdk = { workspace = true }
solana-client = { workspace = true }
solana-transaction-status = { workspace = true }
solana-account-decoder = { workspace = true }
rust-crypto = { workspace = true }
url = { workspace = true }
anchor-lang = { workspace = true }
borsh = { workspace = true }
Loading

0 comments on commit 7847939

Please sign in to comment.