From bb0641e4daea9c23000c48add3d0298cfd66aeb7 Mon Sep 17 00:00:00 2001 From: Abdul Basit Date: Wed, 4 Oct 2023 11:39:37 +0500 Subject: [PATCH 1/6] airdrop mints --- Cargo.lock | 10 + Cargo.toml | 1 + build.rs | 6 +- src/cache.rs | 87 ++++++- src/cli.rs | 20 ++ src/commands/airdrop.rs | 367 +++++++++++++++++++++++++++++ src/commands/upload_drop.rs | 26 +- src/common/error.rs | 20 ++ src/main.rs | 7 +- src/proto/mint-random-queued.proto | 15 ++ src/queries/mint-random.graphql | 8 + src/queries/mint-status.graphql | 6 + 12 files changed, 546 insertions(+), 27 deletions(-) create mode 100644 src/commands/airdrop.rs create mode 100644 src/common/error.rs create mode 100644 src/proto/mint-random-queued.proto create mode 100644 src/queries/mint-random.graphql create mode 100644 src/queries/mint-status.graphql diff --git a/Cargo.lock b/Cargo.lock index 956c8c4..24c0df0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -196,6 +196,15 @@ dependencies = [ "alloc-stdlib", ] +[[package]] +name = "bs58" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f5353f36341f7451062466f0b755b96ac3a9547e4d7f6b70d603fc721a7d7896" +dependencies = [ + "tinyvec", +] + [[package]] name = "bumpalo" version = "3.14.0" @@ -797,6 +806,7 @@ name = "holaplex-hub-cli" version = "0.1.0" dependencies = [ "anyhow", + "bs58", "clap", "crossbeam", "dashmap", diff --git a/Cargo.toml b/Cargo.toml index 3353465..ab941f1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,6 +36,7 @@ toml = { version = "0.8.0", features = ["preserve_order"] } url = { version = "2.4.1", features = ["serde"] } uuid = { version = "1.4.1", features = ["serde"] } xxhash-rust = { version = "0.8.7", features = ["xxh3", "const_xxh3"] } +bs58 = "0.5" [build-dependencies] prost-build = "0.12.1" diff --git a/build.rs b/build.rs index c1d7b12..c090cc9 100644 --- a/build.rs +++ b/build.rs @@ -1,6 +1,10 @@ fn main() { prost_build::compile_protos( - &["src/proto/asset-upload.proto", "src/proto/drop-mint.proto"], + &[ + "src/proto/asset-upload.proto", + "src/proto/drop-mint.proto", + "src/proto/mint-random-queued.proto", + ], &["src/proto"], ) .unwrap(); diff --git a/src/cache.rs b/src/cache.rs index e262bac..b336dd1 100644 --- a/src/cache.rs +++ b/src/cache.rs @@ -22,9 +22,10 @@ use crate::cli::CacheOpts; mod proto { include!(concat!(env!("OUT_DIR"), "/cache.asset_uploads.rs")); include!(concat!(env!("OUT_DIR"), "/cache.drop_mints.rs")); + include!(concat!(env!("OUT_DIR"), "/cache.mint_random_queued.rs")); } -pub use proto::{AssetUpload, DropMint}; +pub use proto::{AssetUpload, CreationStatus, DropMint, MintRandomQueued as ProtoMintRandomQueued}; #[derive(Clone, Copy, PartialEq, Eq)] #[repr(transparent)] @@ -381,6 +382,90 @@ impl<'a> AssetUploadCache<'a> { } } +#[repr(transparent)] +pub struct AirdropWalletsCache<'a>(Cow<'a, Cache>); + +impl<'a> CacheFamily<'a> for AirdropWalletsCache<'a> { + type Static = AirdropWalletsCache<'static>; + + const CF_NAME: &'static str = "airdrop-wallets"; + + #[inline] + fn new(cache: Cow<'a, Cache>) -> Self { Self(cache) } + + #[inline] + fn cache(&self) -> &Cow<'a, Cache> { &self.0 } +} + +impl<'a> AirdropWalletsCache<'a> { + pub async fn get( + &self, + path: impl AsRef + Send + 'static, + pubkey: String, + ) -> Result> { + let this = self.to_static(); + spawn_blocking(move || this.get_sync(path, pubkey)) + .await + .unwrap() + } + + pub fn get_sync( + &self, + path: impl AsRef, + key: String, + ) -> Result> { + let path = path.as_ref(); + + let bytes = self + .0 + .db + .get_cf_opt( + &self.0.get_cf(Self::CF_NAME)?, + key.into_bytes(), + self.0.config.read_opts(), + ) + .with_context(|| format!("Error getting airdrop wallet for {path:?}"))?; + + let Some(bytes) = bytes else { return Ok(None) }; + let airdrop = ProtoMintRandomQueued::decode(&*bytes) + .with_context(|| format!("Error parsing for {path:?} (this should not happen)"))?; + + Ok(Some(airdrop)) + } + + pub async fn set( + &self, + path: impl AsRef + Send + 'static, + key: &'static str, + airdrop: ProtoMintRandomQueued, + ) -> Result<()> { + let this = self.to_static(); + spawn_blocking(move || this.set_sync(path, key, &airdrop)) + .await + .unwrap() + } + + pub fn set_sync( + &self, + path: impl AsRef, + key: &str, + airdrop: &ProtoMintRandomQueued, + ) -> Result<()> { + let path = path.as_ref(); + let bytes = airdrop.encode_to_vec(); + + self.0 + .db + .put_cf_opt( + &self.0.get_cf(Self::CF_NAME)?, + key.as_bytes(), + bytes, + self.0.config.write_opts(), + ) + .with_context(|| format!("Error setting for {path:?}")) + } +} + #[repr(transparent)] pub struct DropMintCache<'a>(Cow<'a, Cache>); diff --git a/src/cli.rs b/src/cli.rs index 312d4c5..a616aef 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -84,6 +84,7 @@ pub enum Subcommand { Config(Config), /// Upload files to Hub Upload(Upload), + Airdrop(Airdrop), } /// Options for hub config @@ -150,6 +151,25 @@ pub struct Upload { pub subcmd: UploadSubcommand, } +/// Options for hub upload +#[derive(clap::Args)] +pub struct Airdrop { + #[arg(short = 'd', long = "drop")] + pub drop_id: Uuid, + + #[arg(short = 'w')] + pub wallets: PathBuf, + + #[arg(short = 'c', default_value = "true")] + pub compressed: bool, + + #[arg(short = 'n', default_value = "1")] + pub mints_per_wallet: u32, + + #[arg(short = 'j', default_value = "8")] + pub jobs: usize, +} + /// Subcommands for hub upload #[derive(clap::Subcommand)] pub enum UploadSubcommand { diff --git a/src/commands/airdrop.rs b/src/commands/airdrop.rs new file mode 100644 index 0000000..7c13155 --- /dev/null +++ b/src/commands/airdrop.rs @@ -0,0 +1,367 @@ +use core::fmt; +use std::{ + borrow::Cow, + fs::File, + io, + io::BufRead, + path::{Path, PathBuf}, + thread::sleep, + time::Duration, +}; + +use anyhow::{bail, Context as _, Result}; +use crossbeam::channel::{self, Sender}; +use futures_util::FutureExt; +use graphql_client::GraphQLQuery; +use log::{error, info, warn}; +use tokio::task::JoinHandle; +use url::Url; + +use crate::{ + cache::{AirdropWalletsCache, Cache, CacheConfig, CreationStatus, ProtoMintRandomQueued}, + cli::Airdrop, + commands::airdrop::mint_random_queued_to_drop::{ + MintRandomQueuedInput, MintRandomQueuedToDropMintRandomQueuedToDropCollectionMint, + }, + common::{concurrent, error::format_errors, reqwest::ResponseExt, tokio::runtime}, + config::Config, +}; + +#[allow(clippy::upper_case_acronyms)] +type UUID = uuid::Uuid; + +struct Pubkey([u8; 32]); + +impl fmt::Display for Pubkey { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", bs58::encode(&self.0).into_string()) + } +} + +impl TryFrom<&str> for Pubkey { + type Error = anyhow::Error; + + fn try_from(s: &str) -> Result { + let mut pubkey_bytes: [u8; 32] = [0; 32]; + bs58::decode(&s) + .onto(&mut pubkey_bytes) + .context("failed to parse string as pubkey") + .map(|_| Self(pubkey_bytes)) + } +} + +#[derive(GraphQLQuery)] +#[graphql( + schema_path = "src/queries/schema.graphql", + query_path = "src/queries/mint-random.graphql", + response_derives = "Debug" +)] +struct MintRandomQueuedToDrop; + +#[derive(GraphQLQuery)] +#[graphql( + schema_path = "src/queries/schema.graphql", + query_path = "src/queries/mint-status.graphql", + response_derives = "Debug" +)] +struct MintStatus; + +#[allow(clippy::large_enum_variant)] +#[derive(Debug)] +enum Job { + Mint(MintRandomQueued), + CheckStatus(CheckMintStatus), + // RetryFailed(RetryFailedMint), +} + +impl Job { + #[inline] + fn run(self, ctx: Context) -> JoinHandle> { + match self { + Job::Mint(j) => j.run(ctx), + Job::CheckStatus(j) => j.run(ctx), + } + } +} + +#[derive(Debug)] +struct CheckMintStatus { + path: PathBuf, + mint_id: UUID, + key: Cow<'static, str>, +} + +impl CheckMintStatus { + fn run(self, ctx: Context) -> JoinHandle> { + tokio::spawn(async move { + let Self { path, mint_id, key } = self; + let cache: AirdropWalletsCache = ctx.cache.get().await?; + let wallet = key.split_once(':').unwrap().0; + + let input = mint_status::Variables { id: mint_id }; + + let res = ctx + .client + .post(ctx.graphql_endpoint) + .json(&MintStatus::build_query(input)) + .send() + .await + .error_for_hub_status(|| format!("check mint status mutation for {path:?}"))? + .json::::ResponseData>>() + .await + .with_context(|| format!("Error parsing mutation response for {path:?}"))?; + + if let Some(data) = res.data { + format_errors(res.errors, (), |s| { + warn!("checkMintStatus mutation for {path:?} returned one or more errors:{s}"); + }); + + info!("Checking status for {mint_id:?}"); + + let response = data.mint.context("Mint not found")?; + let mint_status::MintStatusMint { + id, + creation_status, + } = response; + + // impl From trait + + match creation_status { + mint_status::CreationStatus::CREATED => { + info!("Mint {mint_id:?} airdropped to {wallet:?}"); + cache.set_sync(path.clone(), &key, &ProtoMintRandomQueued { + mint_id: id.to_string(), + mint_address: None, + status: CreationStatus::Created.into(), + })?; + }, + mint_status::CreationStatus::PENDING => { + sleep(Duration::from_secs(3)); + ctx.q.send(Job::CheckStatus(CheckMintStatus { + path: path.clone(), + mint_id: id, + key: key.clone(), + }))?; + }, + _ => { + info!("Mint {mint_id:?}:? failed. retrying.."); + cache.set_sync(path.clone(), &key, &ProtoMintRandomQueued { + mint_id: id.to_string(), + mint_address: None, + status: CreationStatus::Failed.into(), + })?; + }, + } + } else { + format_errors(res.errors, Ok(()), |s| { + bail!("checkMintStatus mutation for {path:?} returned one or more errors:{s}") + })?; + + bail!("CheckMintStatus mutation for {path:?} returned no data"); + } + + Ok(()) + }) + } +} + +#[derive(Debug)] +struct MintRandomQueued { + path: PathBuf, + key: Cow<'static, str>, + drop_id: UUID, + compressed: bool, +} + +impl MintRandomQueued { + fn run(self, ctx: Context) -> JoinHandle> { + tokio::task::spawn(async move { + let Self { + path, + key, + drop_id, + compressed, + } = self; + let cache: AirdropWalletsCache = ctx.cache.get().await?; + + let record = cache.get_sync(path.clone(), key.to_string())?; + if let Some(r) = record { + let status = + CreationStatus::try_from(r.status).context("invalid creation status")?; + let wallet = key.split_once(':').unwrap().0; + match status { + CreationStatus::Created => { + info!("Mint {:?} already airdropped to {wallet:?}", r.mint_id,); + return Ok(()); + }, + CreationStatus::Failed | CreationStatus::Pending => { + info!( + "Mint {:?} status = {:?}. Checking status again...", + r.mint_id, + status.as_str_name() + ); + ctx.q.send(Job::CheckStatus(CheckMintStatus { + path, + mint_id: r.mint_id.parse()?, + key, + }))?; + }, + } + } else { + let (wallet, _) = key.split_once(':').unwrap(); + let input = mint_random_queued_to_drop::Variables { + in_: MintRandomQueuedInput { + drop: drop_id, + recipient: wallet.to_string(), + compressed, + }, + }; + + let res = + ctx.client + .post(ctx.graphql_endpoint) + .json(&MintRandomQueuedToDrop::build_query(input)) + .send() + .await + .error_for_hub_status(|| format!("queueMintToDrop mutation for {path:?}"))? + .json::::ResponseData, + >>() + .await + .with_context(|| format!("Error parsing mutation response for {path:?}"))?; + + log::trace!("GraphQL response for {path:?}: {res:?}"); + + if let Some(data) = res.data { + format_errors(res.errors, (), |s| { + warn!( + "mintRandomQueuedToDrop mutation for {path:?} returned one or more \ + errors:{s}" + ); + }); + + let mint_random_queued_to_drop::ResponseData { + mint_random_queued_to_drop: + mint_random_queued_to_drop::MintRandomQueuedToDropMintRandomQueuedToDrop { + collection_mint: + MintRandomQueuedToDropMintRandomQueuedToDropCollectionMint { + id, + .. + }, + }, + } = data; + + cache.set_sync(path.clone(), &key, &ProtoMintRandomQueued { + mint_id: id.to_string(), + mint_address: None, + status: CreationStatus::Pending.into(), + })?; + + ctx.q.send(Job::CheckStatus(CheckMintStatus { + path, + mint_id: id, + key: key.clone(), + }))?; + + info!("Pending for {:?}", wallet); + } else { + format_errors(res.errors, Ok(()), |s| { + bail!( + "mintRandomQueuedToDrop mutation for {path:?} returned one or more \ + errors:{s}" + ) + })?; + + bail!("mintRandomQueuedToDrop mutation for {path:?} returned no data"); + } + } + + Ok(()) + }) + } +} + +#[derive(Clone)] +struct Context { + graphql_endpoint: Url, + client: reqwest::Client, + q: Sender, + cache: Cache, +} + +pub fn run(config: &Config, cache: CacheConfig, args: Airdrop) -> Result<()> { + let mut any_errs = false; + + let Airdrop { + drop_id, + wallets, + compressed, + mints_per_wallet, + jobs, + } = args; + + let (tx, rx) = channel::unbounded(); + + let input_file = File::open(&wallets) + .with_context(|| format!("wallets file does not exist {:?}", wallets))?; + let reader = io::BufReader::new(input_file); + + for line in reader.lines() { + let line = line?; + let pubkey: Pubkey = line.trim_end_matches('\n').try_into()?; + + let mut nft_number = 1; + while nft_number <= mints_per_wallet { + let key: String = format!("{pubkey}:{nft_number}"); + tx.send(Job::Mint(MintRandomQueued { + key: Cow::Owned(key), + drop_id, + path: drop_id.to_string().into(), + compressed, + })) + .context("Error seeding initial job queue")?; + nft_number += 1; + } + } + + let ctx = Context { + graphql_endpoint: config.graphql_endpoint().clone(), + client: config.graphql_client()?, + q: tx, + cache: Cache::load_sync(Path::new(".airdrops").join(drop_id.to_string()), cache)?, + }; + + runtime()?.block_on(async move { + let res = concurrent::try_run( + jobs, + |e| { + error!("{e:?}"); + any_errs = true; + }, + || { + let job = match rx.try_recv() { + Ok(j) => Some(j), + Err(channel::TryRecvError::Empty) => None, + Err(e) => return Err(e).context("Error getting job from queue"), + }; + + let Some(job) = job else { + return Ok(None); + }; + + log::trace!("Submitting job: {job:?}"); + + Ok(Some(job.run(ctx.clone()).map(|f| { + f.context("Worker task panicked").and_then(|r| r) + }))) + }, + ) + .await; + + debug_assert!(rx.is_empty(), "Trailing jobs in queue"); + + res + })?; + + Ok(()) +} diff --git a/src/commands/upload_drop.rs b/src/commands/upload_drop.rs index 6576216..e7b7609 100644 --- a/src/commands/upload_drop.rs +++ b/src/commands/upload_drop.rs @@ -1,7 +1,6 @@ use std::{ borrow::Cow, collections::{HashMap, HashSet}, - fmt::Write, fs::{self, File}, io::{self, prelude::*}, iter, @@ -34,6 +33,7 @@ use crate::{ cli::UploadDrop, common::{ concurrent, + error::format_errors, metadata_json::{self, MetadataJson}, reqwest::ResponseExt, tokio::runtime, @@ -566,26 +566,6 @@ struct QueueJsonJob { } impl QueueJsonJob { - fn format_errors( - errors: Option>, - ok: T, - f: impl FnOnce(String) -> T, - ) -> T { - let mut errs = errors.into_iter().flatten().peekable(); - - if errs.peek().is_some() { - let mut s = String::new(); - - for err in errs { - write!(s, "\n {err}").unwrap(); - } - - f(s) - } else { - ok - } - } - fn rewrite_json(json: &mut MetadataJson, rewrites: Option>>) { let rewrites: HashMap<_, _> = rewrites .into_iter() @@ -685,7 +665,7 @@ impl QueueJsonJob { let collection_mint; if let Some(data) = res.data { - Self::format_errors(res.errors, (), |s| { + format_errors(res.errors, (), |s| { warn!( "queueMintToDrop mutation for {path:?} returned one or more errors:{s}" ); @@ -705,7 +685,7 @@ impl QueueJsonJob { ctx.stats.queued_mints.fetch_add(1, Ordering::Relaxed); info!("Mint successfully queued for {path:?}"); } else { - Self::format_errors(res.errors, Ok(()), |s| { + format_errors(res.errors, Ok(()), |s| { bail!( "queueMintToDrop mutation for {path:?} returned one or more errors:{s}" ) diff --git a/src/common/error.rs b/src/common/error.rs new file mode 100644 index 0000000..c1fd966 --- /dev/null +++ b/src/common/error.rs @@ -0,0 +1,20 @@ +pub(crate) fn format_errors( + errors: Option>, + ok: T, + f: impl FnOnce(String) -> T, +) -> T { + let mut errs = errors.into_iter().flatten().peekable(); + + if errs.peek().is_some() { + let mut s = String::new(); + + for err in errs { + write!(s, "\n {err}").unwrap(); + } + + f(s) + } else { + ok + } +} +use std::fmt::Write; diff --git a/src/main.rs b/src/main.rs index 697c47d..377af0c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -16,12 +16,14 @@ mod cli; mod config; mod commands { + pub mod airdrop; pub mod config; pub mod upload_drop; } mod common { pub mod concurrent; + pub mod error; pub mod metadata_json; pub mod reqwest; pub mod tokio; @@ -45,7 +47,7 @@ mod entry { use crate::{ cache::CacheConfig, cli::{log_color, Opts, Subcommand, UploadSubcommand}, - commands::{config, upload_drop}, + commands::{airdrop, config, upload_drop}, config::{Config, ConfigLocation}, }; @@ -69,13 +71,14 @@ mod entry { .init(); let config = |w| ConfigLocation::new(config, w); - let cache = CacheConfig::new(cache); + let cache: CacheConfig = CacheConfig::new(cache); match subcmd { Subcommand::Config(c) => config::run(&config(true)?, c), Subcommand::Upload(u) => match u.subcmd { UploadSubcommand::Drop(d) => upload_drop::run(&read(config)?, cache, d), }, + Subcommand::Airdrop(a) => airdrop::run(&read(config)?, cache, a), } } } diff --git a/src/proto/mint-random-queued.proto b/src/proto/mint-random-queued.proto new file mode 100644 index 0000000..451db01 --- /dev/null +++ b/src/proto/mint-random-queued.proto @@ -0,0 +1,15 @@ +syntax = 'proto3'; + +package cache.mint_random_queued; + +message MintRandomQueued { + string mint_id = 1; + optional string mint_address = 2; + CreationStatus status = 3; +} + +enum CreationStatus { + Created = 0; + Failed = 1; + Pending = 2; +} \ No newline at end of file diff --git a/src/queries/mint-random.graphql b/src/queries/mint-random.graphql new file mode 100644 index 0000000..d8b1ce8 --- /dev/null +++ b/src/queries/mint-random.graphql @@ -0,0 +1,8 @@ +mutation MintRandomQueuedToDrop($in: MintRandomQueuedInput!) { + mintRandomQueuedToDrop(input: $in) { + collectionMint { + id + creationStatus + } + } +} diff --git a/src/queries/mint-status.graphql b/src/queries/mint-status.graphql new file mode 100644 index 0000000..93f635c --- /dev/null +++ b/src/queries/mint-status.graphql @@ -0,0 +1,6 @@ +query MintStatus($id: UUID!) { + mint(id: $id) { + id + creationStatus + } +} From a6a3c1b31e9bef8171153de5923ada61d1b5a730 Mon Sep 17 00:00:00 2001 From: Abdul Basit Date: Wed, 4 Oct 2023 11:51:57 +0500 Subject: [PATCH 2/6] use default_value_t --- src/cli.rs | 6 +++--- src/commands/airdrop.rs | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index a616aef..cd1d3cc 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -160,13 +160,13 @@ pub struct Airdrop { #[arg(short = 'w')] pub wallets: PathBuf, - #[arg(short = 'c', default_value = "true")] + #[arg(short = 'c', default_value_t = true)] pub compressed: bool, - #[arg(short = 'n', default_value = "1")] + #[arg(short = 'n', default_value_t = 1)] pub mints_per_wallet: u32, - #[arg(short = 'j', default_value = "8")] + #[arg(short = 'j', default_value_t = 8)] pub jobs: usize, } diff --git a/src/commands/airdrop.rs b/src/commands/airdrop.rs index 7c13155..207ed3b 100644 --- a/src/commands/airdrop.rs +++ b/src/commands/airdrop.rs @@ -116,7 +116,7 @@ impl CheckMintStatus { warn!("checkMintStatus mutation for {path:?} returned one or more errors:{s}"); }); - info!("Checking status for {mint_id:?}"); + info!("Checking status for mint {mint_id:?}"); let response = data.mint.context("Mint not found")?; let mint_status::MintStatusMint { @@ -263,7 +263,7 @@ impl MintRandomQueued { key: key.clone(), }))?; - info!("Pending for {:?}", wallet); + info!("Pending for wallet {wallet:?}"); } else { format_errors(res.errors, Ok(()), |s| { bail!( From 6b7039b2ebbea5dcda4e064ea7de056a204feac3 Mon Sep 17 00:00:00 2001 From: Abdul Basit Date: Wed, 4 Oct 2023 20:32:09 +0500 Subject: [PATCH 3/6] basic stats --- src/commands/airdrop.rs | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/src/commands/airdrop.rs b/src/commands/airdrop.rs index 207ed3b..b586c65 100644 --- a/src/commands/airdrop.rs +++ b/src/commands/airdrop.rs @@ -5,6 +5,10 @@ use std::{ io, io::BufRead, path::{Path, PathBuf}, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, thread::sleep, time::Duration, }; @@ -84,6 +88,13 @@ impl Job { } } +#[derive(Default)] +struct Stats { + pending_mints: AtomicUsize, + failed_mints: AtomicUsize, + created_mints: AtomicUsize, +} + #[derive(Debug)] struct CheckMintStatus { path: PathBuf, @@ -129,6 +140,9 @@ impl CheckMintStatus { match creation_status { mint_status::CreationStatus::CREATED => { info!("Mint {mint_id:?} airdropped to {wallet:?}"); + ctx.stats.pending_mints.fetch_sub(1, Ordering::Relaxed); + ctx.stats.created_mints.fetch_add(1, Ordering::Relaxed); + cache.set_sync(path.clone(), &key, &ProtoMintRandomQueued { mint_id: id.to_string(), mint_address: None, @@ -144,6 +158,7 @@ impl CheckMintStatus { }))?; }, _ => { + ctx.stats.failed_mints.fetch_add(1, Ordering::Relaxed); info!("Mint {mint_id:?}:? failed. retrying.."); cache.set_sync(path.clone(), &key, &ProtoMintRandomQueued { mint_id: id.to_string(), @@ -191,10 +206,13 @@ impl MintRandomQueued { let wallet = key.split_once(':').unwrap().0; match status { CreationStatus::Created => { + ctx.stats.created_mints.fetch_add(1, Ordering::Relaxed); info!("Mint {:?} already airdropped to {wallet:?}", r.mint_id,); return Ok(()); }, CreationStatus::Failed | CreationStatus::Pending => { + ctx.stats.pending_mints.fetch_add(1, Ordering::Relaxed); + info!( "Mint {:?} status = {:?}. Checking status again...", r.mint_id, @@ -257,6 +275,8 @@ impl MintRandomQueued { status: CreationStatus::Pending.into(), })?; + ctx.stats.pending_mints.fetch_add(1, Ordering::Relaxed); + ctx.q.send(Job::CheckStatus(CheckMintStatus { path, mint_id: id, @@ -272,6 +292,8 @@ impl MintRandomQueued { ) })?; + ctx.stats.failed_mints.fetch_add(1, Ordering::Relaxed); + bail!("mintRandomQueuedToDrop mutation for {path:?} returned no data"); } } @@ -287,6 +309,7 @@ struct Context { client: reqwest::Client, q: Sender, cache: Cache, + stats: Arc, } pub fn run(config: &Config, cache: CacheConfig, args: Airdrop) -> Result<()> { @@ -329,6 +352,7 @@ pub fn run(config: &Config, cache: CacheConfig, args: Airdrop) -> Result<()> { client: config.graphql_client()?, q: tx, cache: Cache::load_sync(Path::new(".airdrops").join(drop_id.to_string()), cache)?, + stats: Arc::default(), }; runtime()?.block_on(async move { @@ -360,6 +384,18 @@ pub fn run(config: &Config, cache: CacheConfig, args: Airdrop) -> Result<()> { debug_assert!(rx.is_empty(), "Trailing jobs in queue"); + let Stats { + created_mints, + pending_mints, + failed_mints, + } = &*ctx.stats; + info!( + "Created: {:?} Pending: {:?} Failed: {:?}", + created_mints.load(Ordering::Relaxed), + pending_mints.load(Ordering::Relaxed), + failed_mints.load(Ordering::Relaxed), + ); + res })?; From 2474b4ce539320d253e64097565f2a66df6f457d Mon Sep 17 00:00:00 2001 From: Abdul Basit Date: Wed, 4 Oct 2023 21:18:53 +0500 Subject: [PATCH 4/6] stats for only new mints --- src/commands/airdrop.rs | 15 +++++++-------- src/common/error.rs | 3 ++- src/main.rs | 2 +- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/commands/airdrop.rs b/src/commands/airdrop.rs index b586c65..768052b 100644 --- a/src/commands/airdrop.rs +++ b/src/commands/airdrop.rs @@ -206,18 +206,17 @@ impl MintRandomQueued { let wallet = key.split_once(':').unwrap().0; match status { CreationStatus::Created => { - ctx.stats.created_mints.fetch_add(1, Ordering::Relaxed); info!("Mint {:?} already airdropped to {wallet:?}", r.mint_id,); return Ok(()); }, - CreationStatus::Failed | CreationStatus::Pending => { - ctx.stats.pending_mints.fetch_add(1, Ordering::Relaxed); + CreationStatus::Failed => { + //retry here - info!( - "Mint {:?} status = {:?}. Checking status again...", - r.mint_id, - status.as_str_name() - ); + info!("Mint {:?} failed. retrying..", r.mint_id,); + }, + + CreationStatus::Pending => { + info!("Mint {:?} is pending. Checking status again...", r.mint_id,); ctx.q.send(Job::CheckStatus(CheckMintStatus { path, mint_id: r.mint_id.parse()?, diff --git a/src/common/error.rs b/src/common/error.rs index c1fd966..d34ff72 100644 --- a/src/common/error.rs +++ b/src/common/error.rs @@ -1,3 +1,5 @@ +use std::fmt::Write; + pub(crate) fn format_errors( errors: Option>, ok: T, @@ -17,4 +19,3 @@ pub(crate) fn format_errors( ok } } -use std::fmt::Write; diff --git a/src/main.rs b/src/main.rs index 377af0c..23556be 100644 --- a/src/main.rs +++ b/src/main.rs @@ -71,7 +71,7 @@ mod entry { .init(); let config = |w| ConfigLocation::new(config, w); - let cache: CacheConfig = CacheConfig::new(cache); + let cache = CacheConfig::new(cache); match subcmd { Subcommand::Config(c) => config::run(&config(true)?, c), From abe854f33580d709f34f4179c415576befdded95 Mon Sep 17 00:00:00 2001 From: raykast Date: Wed, 4 Oct 2023 21:44:24 -0700 Subject: [PATCH 5/6] Fixups to airdrop feature branch. --- .rustfmt.toml | 2 +- Cargo.lock | 53 ++- Cargo.toml | 3 +- src/cache.rs | 371 +++++++++--------- src/cli.rs | 63 +-- src/commands/airdrop.rs | 594 ++++++++++++++--------------- src/commands/upload_drop.rs | 186 ++++----- src/common/concurrent.rs | 53 ++- src/common/error.rs | 21 - src/common/graphql.rs | 63 +++ src/common/pubkey.rs | 43 +++ src/common/reqwest.rs | 53 ++- src/common/stats.rs | 10 + src/common/url_permissive.rs | 3 +- src/main.rs | 6 +- src/proto/mint-random-queued.proto | 8 +- 16 files changed, 870 insertions(+), 662 deletions(-) delete mode 100644 src/common/error.rs create mode 100644 src/common/graphql.rs create mode 100644 src/common/pubkey.rs create mode 100644 src/common/stats.rs diff --git a/.rustfmt.toml b/.rustfmt.toml index 51cab9a..a7cecf1 100644 --- a/.rustfmt.toml +++ b/.rustfmt.toml @@ -33,7 +33,7 @@ imports_granularity = "Crate" group_imports = "StdExternalCrate" reorder_imports = true reorder_modules = true -reorder_impl_items = true +reorder_impl_items = false type_punctuation_density = "Wide" space_before_colon = false space_after_colon = true diff --git a/Cargo.lock b/Cargo.lock index 24c0df0..8f7d0d5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -121,6 +121,18 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "backon" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c1a6197b2120bb2185a267f6515038558b019e92b832bb0320e96d66268dcf9" +dependencies = [ + "fastrand 1.9.0", + "futures-core", + "pin-project", + "tokio", +] + [[package]] name = "backtrace" version = "0.3.69" @@ -564,6 +576,15 @@ dependencies = [ "str-buf", ] +[[package]] +name = "fastrand" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be" +dependencies = [ + "instant", +] + [[package]] name = "fastrand" version = "2.0.1" @@ -806,6 +827,7 @@ name = "holaplex-hub-cli" version = "0.1.0" dependencies = [ "anyhow", + "backon", "bs58", "clap", "crossbeam", @@ -960,6 +982,15 @@ dependencies = [ "cfb", ] +[[package]] +name = "instant" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" +dependencies = [ + "cfg-if", +] + [[package]] name = "ipnet" version = "2.8.0" @@ -1319,6 +1350,26 @@ dependencies = [ "indexmap 2.0.2", ] +[[package]] +name = "pin-project" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fda4ed1c6c173e3fc7a83629421152e01d7b1f9b7f65fb301e490e8cfc656422" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.37", +] + [[package]] name = "pin-project-lite" version = "0.2.13" @@ -1797,7 +1848,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cb94d2f3cc536af71caac6b6fcebf65860b347e7ce0cc9ebe8f70d3e521054ef" dependencies = [ "cfg-if", - "fastrand", + "fastrand 2.0.1", "redox_syscall 0.3.5", "rustix", "windows-sys", diff --git a/Cargo.toml b/Cargo.toml index ab941f1..e0c1291 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,8 @@ path = "src/main.rs" [dependencies] anyhow = "1.0.75" +backon = "0.4.1" +bs58 = "0.5.0" clap = { version = "4.4.4", features = ["cargo", "derive", "env", "wrap_help"] } crossbeam = "0.8.2" dashmap = "5.5.3" @@ -36,7 +38,6 @@ toml = { version = "0.8.0", features = ["preserve_order"] } url = { version = "2.4.1", features = ["serde"] } uuid = { version = "1.4.1", features = ["serde"] } xxhash-rust = { version = "0.8.7", features = ["xxh3", "const_xxh3"] } -bs58 = "0.5" [build-dependencies] prost-build = "0.12.1" diff --git a/src/cache.rs b/src/cache.rs index b336dd1..ff8b815 100644 --- a/src/cache.rs +++ b/src/cache.rs @@ -1,7 +1,8 @@ use std::{ - borrow::Cow, + borrow::{Borrow, Cow}, fmt, io::{self, prelude::*}, + marker::PhantomData, path::{Path, PathBuf}, pin::pin, sync::Arc, @@ -17,15 +18,17 @@ use tokio::{ }; use xxhash_rust::xxh3::Xxh3; -use crate::cli::CacheOpts; +use crate::{cli::CacheOpts, common::pubkey::Pubkey}; mod proto { + #![allow(clippy::doc_markdown, clippy::trivially_copy_pass_by_ref)] + include!(concat!(env!("OUT_DIR"), "/cache.asset_uploads.rs")); include!(concat!(env!("OUT_DIR"), "/cache.drop_mints.rs")); include!(concat!(env!("OUT_DIR"), "/cache.mint_random_queued.rs")); } -pub use proto::{AssetUpload, CreationStatus, DropMint, MintRandomQueued as ProtoMintRandomQueued}; +pub use proto::{AssetUpload, CreationStatus, DropMint, MintRandomQueued}; #[derive(Clone, Copy, PartialEq, Eq)] #[repr(transparent)] @@ -197,21 +200,26 @@ mod private { #[allow(clippy::wildcard_imports)] use super::*; - pub trait CacheFamily<'a> { - type Static: CacheFamily<'static>; + pub trait CacheFamily: Send + 'static { + type KeyName: ?Sized + fmt::Debug + ToOwned; + type OwnedKeyName: fmt::Debug + Borrow + Send + 'static; - const CF_NAME: &'static str; + type Key: ?Sized + ToOwned; + type OwnedKey: Borrow + Send + 'static; + type KeyBytes<'a>: AsRef<[u8]>; - fn new(cache: Cow<'a, Cache>) -> Self; + type Value: ?Sized + ToOwned; + type OwnedValue: Borrow + Send + 'static; + type ValueBytes<'a>: AsRef<[u8]>; - fn cache(&self) -> &Cow<'a, Cache>; + type ParseError: Into; - fn to_static(&self) -> Self::Static { - Self::Static::new(Cow::Owned(self.cache().as_ref().to_owned())) - } + const CF_NAME: &'static str; + const VALUE_NAME: &'static str; - #[inline] - fn get_cf(&'a self) -> Result { self.cache().get_cf(Self::CF_NAME) } + fn key_bytes(key: &Self::Key) -> Self::KeyBytes<'_>; + fn value_bytes(value: &Self::Value) -> Self::ValueBytes<'_>; + fn parse_value(bytes: Vec) -> Result; } } @@ -224,9 +232,11 @@ pub struct Cache { db: Arc, } -// NB: Don't use the async versions unless you absolutely have to, they're -// going to all be slower. +// NB: The async methods all have a bunch of overhead - they are all analogous +// to calling the blocking versions inside spawn_blocking, and should only +// be used in contexts where spawn_blocking would be otherwise required. +#[allow(dead_code)] impl Cache { pub async fn load( path: impl AsRef + Send + 'static, @@ -283,261 +293,260 @@ impl Cache { }) } - pub async fn get + Send + 'static>(&self) -> Result { + pub async fn get(&self) -> Result> { let this = self.clone(); spawn_blocking(move || { this.create_cf(C::CF_NAME)?; - Ok(C::new(Cow::Owned(this))) + Ok(CacheRef(Cow::Owned(this), PhantomData)) }) .await .unwrap() } - pub fn get_sync<'a, C: CacheFamily<'a>>(&'a self) -> Result { + pub fn get_sync(&self) -> Result> { self.create_cf(C::CF_NAME)?; - Ok(C::new(Cow::Borrowed(self))) + Ok(CacheRef(Cow::Borrowed(self), PhantomData)) } } #[repr(transparent)] -pub struct AssetUploadCache<'a>(Cow<'a, Cache>); - -impl<'a> CacheFamily<'a> for AssetUploadCache<'a> { - type Static = AssetUploadCache<'static>; - - const CF_NAME: &'static str = "asset-uploads"; +pub struct CacheRef<'a, F>(Cow<'a, Cache>, PhantomData); +#[allow(dead_code)] +impl<'a, F: CacheFamily> CacheRef<'a, F> { #[inline] - fn new(cache: Cow<'a, Cache>) -> Self { Self(cache) } + fn to_static(&self) -> CacheRef<'static, F> { + CacheRef(Cow::Owned(self.0.as_ref().to_owned()), PhantomData) + } #[inline] - fn cache(&self) -> &Cow<'a, Cache> { &self.0 } -} + fn get_cf(&'a self) -> Result { self.0.get_cf(F::CF_NAME) } -impl<'a> AssetUploadCache<'a> { - pub async fn get( + #[inline] + pub async fn get_named( &self, - path: impl AsRef + Send + 'static, - ck: Checksum, - ) -> Result> { + name: F::OwnedKeyName, + key: F::OwnedKey, + ) -> Result> { let this = self.to_static(); - spawn_blocking(move || this.get_sync(path, ck)) + spawn_blocking(move || this.get_named_sync(name, key)) .await .unwrap() } - pub fn get_sync(&self, path: impl AsRef, ck: Checksum) -> Result> { - let path = path.as_ref(); - + pub fn get_named_sync( + &self, + name: impl Borrow, + key: impl Borrow, + ) -> Result> { + let name = name.borrow(); let bytes = self .0 .db .get_cf_opt( - &self.0.get_cf(Self::CF_NAME)?, - ck.to_bytes(), + &self.get_cf()?, + F::key_bytes(key.borrow()), self.0.config.read_opts(), ) - .with_context(|| format!("Error getting asset upload cache for {path:?}"))?; + .with_context(|| format!("Error getting {} for {name:?}", F::VALUE_NAME))?; let Some(bytes) = bytes else { return Ok(None) }; - let upload = AssetUpload::decode(&*bytes).with_context(|| { - format!("Error parsing asset upload cache for {path:?} (this should not happen)") + let parsed = F::parse_value(bytes).map_err(Into::into).with_context(|| { + format!( + "Error parsing {} for {name:?} (this should not happen)", + F::VALUE_NAME + ) })?; - Ok(Some(upload)) + Ok(Some(parsed)) } - pub async fn set( + #[inline] + pub async fn set_named( &self, - path: impl AsRef + Send + 'static, - ck: Checksum, - upload: AssetUpload, + name: F::OwnedKeyName, + key: F::OwnedKey, + value: F::OwnedValue, ) -> Result<()> { let this = self.to_static(); - spawn_blocking(move || this.set_sync(path, ck, &upload)) + spawn_blocking(move || this.set_named_sync(name, key, value)) .await .unwrap() } - pub fn set_sync( + #[inline] + pub fn set_named_sync( &self, - path: impl AsRef, - ck: Checksum, - upload: &AssetUpload, + name: impl Borrow, + key: impl Borrow, + value: impl Borrow, ) -> Result<()> { - let path = path.as_ref(); - let bytes = upload.encode_to_vec(); + let name = name.borrow(); + let bytes = F::value_bytes(value.borrow()); self.0 .db .put_cf_opt( - &self.0.get_cf(Self::CF_NAME)?, - ck.to_bytes(), - bytes, + &self.get_cf()?, + F::key_bytes(key.borrow()).as_ref(), + bytes.as_ref(), self.0.config.write_opts(), ) - .with_context(|| format!("Error setting asset upload cache for {path:?}")) + .with_context(|| format!("Error setting {} for {name:?}", F::VALUE_NAME)) } } -#[repr(transparent)] -pub struct AirdropWalletsCache<'a>(Cow<'a, Cache>); - -impl<'a> CacheFamily<'a> for AirdropWalletsCache<'a> { - type Static = AirdropWalletsCache<'static>; +#[allow(dead_code)] +impl<'a, F: CacheFamily> CacheRef<'a, F> +where F::Key: Borrow +{ + #[inline] + pub fn get_sync(&self, key: impl Borrow) -> Result> { + let key = key.borrow(); + self.get_named_sync(key.borrow(), key) + } - const CF_NAME: &'static str = "airdrop-wallets"; + #[inline] + pub fn set_sync(&self, key: impl Borrow, value: impl Borrow) -> Result<()> { + let key = key.borrow(); + self.set_named_sync(key.borrow(), key, value) + } +} +#[allow(dead_code)] +impl<'a, F: CacheFamily> CacheRef<'a, F> +where F::OwnedKey: ToOwned +{ #[inline] - fn new(cache: Cow<'a, Cache>) -> Self { Self(cache) } + pub async fn get(&self, key: F::OwnedKey) -> Result> { + self.get_named(key.to_owned(), key).await + } #[inline] - fn cache(&self) -> &Cow<'a, Cache> { &self.0 } + pub async fn set(&self, key: F::OwnedKey, value: F::OwnedValue) -> Result<()> { + self.set_named(key.to_owned(), key, value).await + } } -impl<'a> AirdropWalletsCache<'a> { - pub async fn get( - &self, - path: impl AsRef + Send + 'static, - pubkey: String, - ) -> Result> { - let this = self.to_static(); - spawn_blocking(move || this.get_sync(path, pubkey)) - .await - .unwrap() - } +pub struct AirdropWalletsCacheFamily; +pub type AirdropWalletsCache<'a> = CacheRef<'a, AirdropWalletsCacheFamily>; - pub fn get_sync( - &self, - path: impl AsRef, - key: String, - ) -> Result> { - let path = path.as_ref(); +#[derive(Clone, Copy)] +pub struct AirdropId { + pub wallet: Pubkey, + pub nft_number: u32, +} - let bytes = self - .0 - .db - .get_cf_opt( - &self.0.get_cf(Self::CF_NAME)?, - key.into_bytes(), - self.0.config.read_opts(), - ) - .with_context(|| format!("Error getting airdrop wallet for {path:?}"))?; +impl AirdropId { + pub fn to_bytes(self) -> [u8; 36] { + let mut buf = [0_u8; 36]; - let Some(bytes) = bytes else { return Ok(None) }; - let airdrop = ProtoMintRandomQueued::decode(&*bytes) - .with_context(|| format!("Error parsing for {path:?} (this should not happen)"))?; + let (bw, bn) = buf.split_at_mut(32); + bw.copy_from_slice(&self.wallet.to_bytes()); + bn.copy_from_slice(&self.nft_number.to_le_bytes()); - Ok(Some(airdrop)) + buf } +} - pub async fn set( - &self, - path: impl AsRef + Send + 'static, - key: &'static str, - airdrop: ProtoMintRandomQueued, - ) -> Result<()> { - let this = self.to_static(); - spawn_blocking(move || this.set_sync(path, key, &airdrop)) - .await - .unwrap() +impl fmt::Debug for AirdropId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let Self { wallet, nft_number } = self; + write!(f, "NFT #{nft_number} for {wallet}") } +} - pub fn set_sync( - &self, - path: impl AsRef, - key: &str, - airdrop: &ProtoMintRandomQueued, - ) -> Result<()> { - let path = path.as_ref(); - let bytes = airdrop.encode_to_vec(); +impl CacheFamily for AirdropWalletsCacheFamily { + type KeyName = AirdropId; + type OwnedKeyName = AirdropId; - self.0 - .db - .put_cf_opt( - &self.0.get_cf(Self::CF_NAME)?, - key.as_bytes(), - bytes, - self.0.config.write_opts(), - ) - .with_context(|| format!("Error setting for {path:?}")) + type Key = AirdropId; + type OwnedKey = AirdropId; + type KeyBytes<'a> = [u8; 36]; + + type Value = MintRandomQueued; + type OwnedValue = MintRandomQueued; + type ValueBytes<'a> = Vec; + + type ParseError = prost::DecodeError; + + const CF_NAME: &'static str = "airdrop-wallets"; + const VALUE_NAME: &'static str = "airdrop mint status"; + + #[inline] + fn key_bytes(key: &Self::Key) -> Self::KeyBytes<'_> { key.to_bytes() } + + #[inline] + fn value_bytes(value: &Self::Value) -> Self::ValueBytes<'_> { value.encode_to_vec() } + + #[inline] + fn parse_value(bytes: Vec) -> Result { + Self::OwnedValue::decode(&*bytes) } } -#[repr(transparent)] -pub struct DropMintCache<'a>(Cow<'a, Cache>); +pub struct AssetUploadCacheFamily; +pub type AssetUploadCache<'a> = CacheRef<'a, AssetUploadCacheFamily>; -impl<'a> CacheFamily<'a> for DropMintCache<'a> { - type Static = DropMintCache<'static>; +impl CacheFamily for AssetUploadCacheFamily { + type KeyName = Path; + type OwnedKeyName = PathBuf; - const CF_NAME: &'static str = "drop-mints"; + type Key = Checksum; + type OwnedKey = Checksum; + type KeyBytes<'a> = [u8; 16]; + + type Value = AssetUpload; + type OwnedValue = AssetUpload; + type ValueBytes<'a> = Vec; + + type ParseError = prost::DecodeError; + + const CF_NAME: &'static str = "asset-uploads"; + const VALUE_NAME: &'static str = "asset upload"; #[inline] - fn new(cache: Cow<'a, Cache>) -> Self { Self(cache) } + fn key_bytes(key: &Self::Key) -> Self::KeyBytes<'_> { key.to_bytes() } #[inline] - fn cache(&self) -> &Cow<'a, Cache> { &self.0 } -} + fn value_bytes(value: &Self::Value) -> Self::ValueBytes<'_> { value.encode_to_vec() } -impl<'a> DropMintCache<'a> { - pub async fn get( - &self, - path: impl AsRef + Send + 'static, - ck: Checksum, - ) -> Result> { - let this = self.to_static(); - spawn_blocking(move || this.get_sync(path, ck)) - .await - .unwrap() + #[inline] + fn parse_value(bytes: Vec) -> Result { + Self::OwnedValue::decode(&*bytes) } +} - pub fn get_sync(&self, path: impl AsRef, ck: Checksum) -> Result> { - let path = path.as_ref(); +pub struct DropMintCacheFamily; +pub type DropMintCache<'a> = CacheRef<'a, DropMintCacheFamily>; - let bytes = self - .0 - .db - .get_cf_opt( - &self.0.get_cf(Self::CF_NAME)?, - ck.to_bytes(), - self.0.config.read_opts(), - ) - .with_context(|| format!("Error getting asset upload cache for {path:?}"))?; +impl CacheFamily for DropMintCacheFamily { + type KeyName = Path; + type OwnedKeyName = PathBuf; - let Some(bytes) = bytes else { return Ok(None) }; - let upload = DropMint::decode(&*bytes).with_context(|| { - format!("Error parsing asset upload cache for {path:?} (this should not happen)") - })?; + type Key = Checksum; + type OwnedKey = Checksum; + type KeyBytes<'a> = [u8; 16]; - Ok(Some(upload)) - } + type Value = DropMint; + type OwnedValue = DropMint; + type ValueBytes<'a> = Vec; - pub async fn set( - &self, - path: impl AsRef + Send + 'static, - ck: Checksum, - upload: DropMint, - ) -> Result<()> { - let this = self.to_static(); - spawn_blocking(move || this.set_sync(path, ck, &upload)) - .await - .unwrap() - } + type ParseError = prost::DecodeError; - pub fn set_sync(&self, path: impl AsRef, ck: Checksum, upload: &DropMint) -> Result<()> { - let path = path.as_ref(); - let bytes = upload.encode_to_vec(); + const CF_NAME: &'static str = "drop-mints"; + const VALUE_NAME: &'static str = "drop mint"; - self.0 - .db - .put_cf_opt( - &self.0.get_cf(Self::CF_NAME)?, - ck.to_bytes(), - bytes, - self.0.config.write_opts(), - ) - .with_context(|| format!("Error setting asset upload cache for {path:?}")) + #[inline] + fn key_bytes(key: &Self::Key) -> Self::KeyBytes<'_> { key.to_bytes() } + + #[inline] + fn value_bytes(value: &Self::Value) -> Self::ValueBytes<'_> { value.encode_to_vec() } + + #[inline] + fn parse_value(bytes: Vec) -> Result { + Self::OwnedValue::decode(&*bytes) } } diff --git a/src/cli.rs b/src/cli.rs index cd1d3cc..2f82678 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -77,14 +77,23 @@ pub struct CacheOpts { pub cache_lru_size: usize, } +/// Mixin options for concurrency +#[derive(clap::Args)] +pub struct ConcurrencyOpts { + /// Limit the number of concurrently-running jobs + #[arg(short = 'j', long = "jobs", default_value_t = 4)] + pub jobs: u16, +} + /// Top-level subcommands for hub #[derive(clap::Subcommand)] pub enum Subcommand { /// View and set configuration for the hub command Config(Config), + /// Mint queued NFTs from an open drop to a list of wallets + Airdrop(Airdrop), /// Upload files to Hub Upload(Upload), - Airdrop(Airdrop), } /// Options for hub config @@ -143,31 +152,39 @@ pub struct ConfigHubEndpoint { pub endpoint: Option, } -/// Options for hub upload -#[derive(clap::Args)] -pub struct Upload { - /// Name of the subcommand to run - #[command(subcommand)] - pub subcmd: UploadSubcommand, -} - -/// Options for hub upload +/// Options for hub airdrop #[derive(clap::Args)] pub struct Airdrop { + /// Concurrency options + #[command(flatten)] + pub concurrency: ConcurrencyOpts, + + /// UUID of the open drop to mint from #[arg(short = 'd', long = "drop")] pub drop_id: Uuid, - #[arg(short = 'w')] - pub wallets: PathBuf, - - #[arg(short = 'c', default_value_t = true)] - pub compressed: bool, + /// Do not mint compressed NFTs + #[arg(long)] + pub no_compressed: bool, - #[arg(short = 'n', default_value_t = 1)] + /// Number of NFTs to mint to each wallet specified + #[arg(short = 'n', long, default_value_t = 1)] pub mints_per_wallet: u32, - #[arg(short = 'j', default_value_t = 8)] - pub jobs: usize, + /// Path to one or more files containing newline-separated wallet addresses + /// to mint to + /// + /// Passing a single hyphen `-` will read from STDIN. + #[arg(required = true)] + pub wallets: Vec, +} + +/// Options for hub upload +#[derive(clap::Args)] +pub struct Upload { + /// Name of the subcommand to run + #[command(subcommand)] + pub subcmd: UploadSubcommand, } /// Subcommands for hub upload @@ -180,7 +197,11 @@ pub enum UploadSubcommand { /// Options for hub upload drop #[derive(clap::Args)] pub struct UploadDrop { - /// UUID of the drop to upload to + /// Concurrency options + #[command(flatten)] + pub concurrency: ConcurrencyOpts, + + /// UUID of the open drop to queue mints to #[arg(short = 'd', long = "drop")] pub drop_id: Uuid, @@ -188,10 +209,6 @@ pub struct UploadDrop { #[arg(short = 'I', long = "include")] pub include_dirs: Vec, - /// Limit the number of concurrently-running jobs - #[arg(short = 'j', long = "jobs", default_value_t = 4)] - pub jobs: u16, - /// Path to a directory containing metadata JSON files to upload #[arg(required = true)] pub input_dirs: Vec, diff --git a/src/commands/airdrop.rs b/src/commands/airdrop.rs index 768052b..87c6547 100644 --- a/src/commands/airdrop.rs +++ b/src/commands/airdrop.rs @@ -1,59 +1,35 @@ -use core::fmt; use std::{ - borrow::Cow, + fmt, fs::File, io, - io::BufRead, - path::{Path, PathBuf}, - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, - }, - thread::sleep, + io::{prelude::*, BufReader}, + path::Path, + sync::Arc, time::Duration, }; use anyhow::{bail, Context as _, Result}; +use backon::{BackoffBuilder, ExponentialBackoff, ExponentialBuilder}; use crossbeam::channel::{self, Sender}; -use futures_util::FutureExt; use graphql_client::GraphQLQuery; -use log::{error, info, warn}; +use log::{info, warn}; use tokio::task::JoinHandle; use url::Url; +use uuid::Uuid; +use self::mint_random_queued_to_drop::{ + MintRandomQueuedInput, MintRandomQueuedToDropMintRandomQueuedToDropCollectionMint, +}; use crate::{ - cache::{AirdropWalletsCache, Cache, CacheConfig, CreationStatus, ProtoMintRandomQueued}, + cache::{AirdropId, AirdropWalletsCache, Cache, CacheConfig, CreationStatus, MintRandomQueued}, cli::Airdrop, - commands::airdrop::mint_random_queued_to_drop::{ - MintRandomQueuedInput, MintRandomQueuedToDropMintRandomQueuedToDropCollectionMint, + common::{ + concurrent, graphql::UUID, pubkey::Pubkey, reqwest::ClientExt, stats::Counter, + tokio::runtime, }, - common::{concurrent, error::format_errors, reqwest::ResponseExt, tokio::runtime}, config::Config, }; -#[allow(clippy::upper_case_acronyms)] -type UUID = uuid::Uuid; - -struct Pubkey([u8; 32]); - -impl fmt::Display for Pubkey { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}", bs58::encode(&self.0).into_string()) - } -} - -impl TryFrom<&str> for Pubkey { - type Error = anyhow::Error; - - fn try_from(s: &str) -> Result { - let mut pubkey_bytes: [u8; 32] = [0; 32]; - bs58::decode(&s) - .onto(&mut pubkey_bytes) - .context("failed to parse string as pubkey") - .map(|_| Self(pubkey_bytes)) - } -} - #[derive(GraphQLQuery)] #[graphql( schema_path = "src/queries/schema.graphql", @@ -70,162 +46,208 @@ struct MintRandomQueuedToDrop; )] struct MintStatus; -#[allow(clippy::large_enum_variant)] -#[derive(Debug)] -enum Job { - Mint(MintRandomQueued), - CheckStatus(CheckMintStatus), - // RetryFailed(RetryFailedMint), +#[derive(Default)] +struct Stats { + queued_mints: Counter, + pending_mints: Counter, + failed_mints: Counter, + created_mints: Counter, } -impl Job { - #[inline] - fn run(self, ctx: Context) -> JoinHandle> { - match self { - Job::Mint(j) => j.run(ctx), - Job::CheckStatus(j) => j.run(ctx), +#[derive(Clone)] +struct Params<'a> { + drop_id: Uuid, + compressed: bool, + mints_per_wallet: u32, + tx: &'a Sender, + backoff: ExponentialBuilder, +} + +fn read_file(name: N, reader: R, params: Params) -> Result<()> { + let Params { + drop_id, + compressed, + mints_per_wallet, + tx, + backoff, + } = params; + + for line in reader.lines() { + let wallet: Pubkey = line + .map_err(anyhow::Error::new) + .and_then(|l| l.trim().parse().map_err(Into::into)) + .with_context(|| format!("Error parsing wallets file {name:?}"))?; + + for nft_number in 1..=mints_per_wallet { + tx.send(Job::Mint(MintRandomQueuedJob { + airdrop_id: AirdropId { wallet, nft_number }, + drop_id, + compressed, + backoff: backoff.build(), + })) + .context("Error seeding initial job queue")?; } } -} -#[derive(Default)] -struct Stats { - pending_mints: AtomicUsize, - failed_mints: AtomicUsize, - created_mints: AtomicUsize, + Ok(()) } -#[derive(Debug)] -struct CheckMintStatus { - path: PathBuf, - mint_id: UUID, - key: Cow<'static, str>, -} +pub fn run(config: &Config, cache: CacheConfig, args: Airdrop) -> Result<()> { + let Airdrop { + concurrency, + drop_id, + no_compressed, + mints_per_wallet, + wallets, + } = args; -impl CheckMintStatus { - fn run(self, ctx: Context) -> JoinHandle> { - tokio::spawn(async move { - let Self { path, mint_id, key } = self; - let cache: AirdropWalletsCache = ctx.cache.get().await?; - let wallet = key.split_once(':').unwrap().0; + let (tx, rx) = channel::unbounded(); - let input = mint_status::Variables { id: mint_id }; + let params = Params { + drop_id, + compressed: !no_compressed, + mints_per_wallet, + tx: &tx, + backoff: ExponentialBuilder::default() + .with_jitter() + .with_factor(2.0) + .with_min_delay(Duration::from_secs(3)) + .with_max_times(5), + }; - let res = ctx - .client - .post(ctx.graphql_endpoint) - .json(&MintStatus::build_query(input)) - .send() - .await - .error_for_hub_status(|| format!("check mint status mutation for {path:?}"))? - .json::::ResponseData>>() - .await - .with_context(|| format!("Error parsing mutation response for {path:?}"))?; - - if let Some(data) = res.data { - format_errors(res.errors, (), |s| { - warn!("checkMintStatus mutation for {path:?} returned one or more errors:{s}"); - }); - - info!("Checking status for mint {mint_id:?}"); - - let response = data.mint.context("Mint not found")?; - let mint_status::MintStatusMint { - id, - creation_status, - } = response; - - // impl From trait - - match creation_status { - mint_status::CreationStatus::CREATED => { - info!("Mint {mint_id:?} airdropped to {wallet:?}"); - ctx.stats.pending_mints.fetch_sub(1, Ordering::Relaxed); - ctx.stats.created_mints.fetch_add(1, Ordering::Relaxed); - - cache.set_sync(path.clone(), &key, &ProtoMintRandomQueued { - mint_id: id.to_string(), - mint_address: None, - status: CreationStatus::Created.into(), - })?; - }, - mint_status::CreationStatus::PENDING => { - sleep(Duration::from_secs(3)); - ctx.q.send(Job::CheckStatus(CheckMintStatus { - path: path.clone(), - mint_id: id, - key: key.clone(), - }))?; - }, - _ => { - ctx.stats.failed_mints.fetch_add(1, Ordering::Relaxed); - info!("Mint {mint_id:?}:? failed. retrying.."); - cache.set_sync(path.clone(), &key, &ProtoMintRandomQueued { - mint_id: id.to_string(), - mint_address: None, - status: CreationStatus::Failed.into(), - })?; - }, - } - } else { - format_errors(res.errors, Ok(()), |s| { - bail!("checkMintStatus mutation for {path:?} returned one or more errors:{s}") - })?; + for path in wallets { + let params = params.clone(); + + if path.as_os_str() == "-" { + read_file("STDIN", BufReader::new(io::stdin()), params)?; + } else { + read_file( + &path, + BufReader::new( + File::open(&path) + .with_context(|| format!("Error opening wallets file {path:?}"))?, + ), + params, + )?; + } + } - bail!("CheckMintStatus mutation for {path:?} returned no data"); - } + let ctx = Context { + // TODO: what should the correct path for this be? + cache: Cache::load_sync(Path::new(".airdrops").join(drop_id.to_string()), cache)?, + graphql_endpoint: config.graphql_endpoint().clone(), + client: config.graphql_client()?, + q: tx, + stats: Arc::default(), + }; - Ok(()) - }) + runtime()?.block_on(async move { + let (res, any_errs) = + concurrent::try_run_channel(concurrency, rx, |j| j.run(ctx.clone())).await; + + let Stats { + queued_mints, + created_mints, + pending_mints, + failed_mints, + } = &*ctx.stats; + info!( + "Of {queued} new mint(s) queued: {created} created, {pending} still pending, {failed} \ + failed", + queued = queued_mints.load(), + created = created_mints.load(), + pending = pending_mints.load(), + failed = failed_mints.load(), + ); + + if any_errs { + warn!( + "Some mints were skipped due to errors. They will be processed next time this \ + command is run." + ); + } + + res + }) +} + +#[derive(Clone)] +struct Context { + cache: Cache, + graphql_endpoint: Url, + client: reqwest::Client, + q: Sender, + stats: Arc, +} + +#[derive(Debug)] +enum Job { + Mint(MintRandomQueuedJob), + CheckStatus(CheckMintStatusJob), +} + +impl Job { + #[inline] + fn run(self, ctx: Context) -> JoinHandle> { + match self { + Job::Mint(j) => j.run(ctx), + Job::CheckStatus(j) => j.run(ctx), + } } } #[derive(Debug)] -struct MintRandomQueued { - path: PathBuf, - key: Cow<'static, str>, - drop_id: UUID, +struct MintRandomQueuedJob { + airdrop_id: AirdropId, + drop_id: Uuid, compressed: bool, + backoff: ExponentialBackoff, } -impl MintRandomQueued { +impl MintRandomQueuedJob { fn run(self, ctx: Context) -> JoinHandle> { tokio::task::spawn(async move { let Self { - path, - key, + airdrop_id, drop_id, compressed, + backoff, } = self; + let AirdropId { + wallet, + nft_number: _, + } = airdrop_id; let cache: AirdropWalletsCache = ctx.cache.get().await?; - let record = cache.get_sync(path.clone(), key.to_string())?; + let record = cache.get(airdrop_id).await?; if let Some(r) = record { - let status = - CreationStatus::try_from(r.status).context("invalid creation status")?; - let wallet = key.split_once(':').unwrap().0; + let status = CreationStatus::try_from(r.status) + .with_context(|| format!("Missing creation status for {airdrop_id:?}"))?; + match status { CreationStatus::Created => { info!("Mint {:?} already airdropped to {wallet:?}", r.mint_id,); return Ok(()); }, CreationStatus::Failed => { - //retry here + // TODO: retry here - info!("Mint {:?} failed. retrying..", r.mint_id,); + warn!("Mint {:?} failed. Retrying...", r.mint_id); }, - CreationStatus::Pending => { - info!("Mint {:?} is pending. Checking status again...", r.mint_id,); - ctx.q.send(Job::CheckStatus(CheckMintStatus { - path, - mint_id: r.mint_id.parse()?, - key, - }))?; + info!("Mint {:?} is pending. Checking status again...", r.mint_id); + ctx.q + .send(Job::CheckStatus(CheckMintStatusJob { + airdrop_id, + mint_id: r.mint_id.parse().with_context(|| { + format!("Invalid mint ID for {airdrop_id:?}") + })?, + backoff, + })) + .context("Error submitting pending mint status check job")?; }, } } else { - let (wallet, _) = key.split_once(':').unwrap(); let input = mint_random_queued_to_drop::Variables { in_: MintRandomQueuedInput { drop: drop_id, @@ -234,67 +256,43 @@ impl MintRandomQueued { }, }; - let res = - ctx.client - .post(ctx.graphql_endpoint) - .json(&MintRandomQueuedToDrop::build_query(input)) - .send() - .await - .error_for_hub_status(|| format!("queueMintToDrop mutation for {path:?}"))? - .json::::ResponseData, - >>() - .await - .with_context(|| format!("Error parsing mutation response for {path:?}"))?; - - log::trace!("GraphQL response for {path:?}: {res:?}"); - - if let Some(data) = res.data { - format_errors(res.errors, (), |s| { - warn!( - "mintRandomQueuedToDrop mutation for {path:?} returned one or more \ - errors:{s}" - ); - }); - - let mint_random_queued_to_drop::ResponseData { - mint_random_queued_to_drop: - mint_random_queued_to_drop::MintRandomQueuedToDropMintRandomQueuedToDrop { - collection_mint: - MintRandomQueuedToDropMintRandomQueuedToDropCollectionMint { - id, - .. - }, - }, - } = data; - - cache.set_sync(path.clone(), &key, &ProtoMintRandomQueued { + let res = ctx + .client + .graphql::() + .post(ctx.graphql_endpoint, input, || { + format!("mintRandomQueuedToDrop mutation for {airdrop_id:?}") + }) + .await?; + + let mint_random_queued_to_drop::ResponseData { + mint_random_queued_to_drop: + mint_random_queued_to_drop::MintRandomQueuedToDropMintRandomQueuedToDrop { + collection_mint: + MintRandomQueuedToDropMintRandomQueuedToDropCollectionMint { + id, .. + }, + }, + } = res.data; + + ctx.stats.queued_mints.increment(); + + cache + .set(airdrop_id, MintRandomQueued { mint_id: id.to_string(), mint_address: None, status: CreationStatus::Pending.into(), - })?; - - ctx.stats.pending_mints.fetch_add(1, Ordering::Relaxed); + }) + .await?; - ctx.q.send(Job::CheckStatus(CheckMintStatus { - path, + ctx.q + .send(Job::CheckStatus(CheckMintStatusJob { + airdrop_id, mint_id: id, - key: key.clone(), - }))?; - - info!("Pending for wallet {wallet:?}"); - } else { - format_errors(res.errors, Ok(()), |s| { - bail!( - "mintRandomQueuedToDrop mutation for {path:?} returned one or more \ - errors:{s}" - ) - })?; + backoff, + })) + .context("Error submitting mint status check job")?; - ctx.stats.failed_mints.fetch_add(1, Ordering::Relaxed); - - bail!("mintRandomQueuedToDrop mutation for {path:?} returned no data"); - } + info!("Pending for wallet {wallet:?}"); } Ok(()) @@ -302,101 +300,99 @@ impl MintRandomQueued { } } -#[derive(Clone)] -struct Context { - graphql_endpoint: Url, - client: reqwest::Client, - q: Sender, - cache: Cache, - stats: Arc, +#[derive(Debug)] +struct CheckMintStatusJob { + airdrop_id: AirdropId, + mint_id: Uuid, + backoff: ExponentialBackoff, } -pub fn run(config: &Config, cache: CacheConfig, args: Airdrop) -> Result<()> { - let mut any_errs = false; - - let Airdrop { - drop_id, - wallets, - compressed, - mints_per_wallet, - jobs, - } = args; - - let (tx, rx) = channel::unbounded(); - - let input_file = File::open(&wallets) - .with_context(|| format!("wallets file does not exist {:?}", wallets))?; - let reader = io::BufReader::new(input_file); - - for line in reader.lines() { - let line = line?; - let pubkey: Pubkey = line.trim_end_matches('\n').try_into()?; - - let mut nft_number = 1; - while nft_number <= mints_per_wallet { - let key: String = format!("{pubkey}:{nft_number}"); - tx.send(Job::Mint(MintRandomQueued { - key: Cow::Owned(key), - drop_id, - path: drop_id.to_string().into(), - compressed, - })) - .context("Error seeding initial job queue")?; - nft_number += 1; - } - } - - let ctx = Context { - graphql_endpoint: config.graphql_endpoint().clone(), - client: config.graphql_client()?, - q: tx, - cache: Cache::load_sync(Path::new(".airdrops").join(drop_id.to_string()), cache)?, - stats: Arc::default(), - }; - - runtime()?.block_on(async move { - let res = concurrent::try_run( - jobs, - |e| { - error!("{e:?}"); - any_errs = true; - }, - || { - let job = match rx.try_recv() { - Ok(j) => Some(j), - Err(channel::TryRecvError::Empty) => None, - Err(e) => return Err(e).context("Error getting job from queue"), - }; - - let Some(job) = job else { - return Ok(None); - }; - - log::trace!("Submitting job: {job:?}"); +impl CheckMintStatusJob { + fn run(self, ctx: Context) -> JoinHandle> { + tokio::spawn(async move { + let Self { + airdrop_id, + mint_id, + mut backoff, + } = self; + let cache: AirdropWalletsCache = ctx.cache.get().await?; - Ok(Some(job.run(ctx.clone()).map(|f| { - f.context("Worker task panicked").and_then(|r| r) - }))) - }, - ) - .await; + let res = ctx + .client + .graphql::() + .post( + ctx.graphql_endpoint, + mint_status::Variables { id: mint_id }, + || format!("mint creationStatus query for {airdrop_id:?}"), + ) + .await?; + + // TODO: review all logging calls to ensure we're outputting the + // correct amount of verbosity + info!("Checking status for mint {mint_id:?}"); + + let response = res + .data + .mint + .with_context(|| format!("Mint not found for {airdrop_id:?}"))?; + let mint_status::MintStatusMint { + id, + creation_status, + } = response; + + match creation_status { + mint_status::CreationStatus::CREATED => { + info!("Mint {mint_id:?} airdropped for {airdrop_id:?}"); + + ctx.stats.created_mints.increment(); + + cache + .set(airdrop_id, MintRandomQueued { + mint_id: id.to_string(), + mint_address: None, + status: CreationStatus::Created.into(), + }) + .await?; + }, + mint_status::CreationStatus::PENDING => { + let Some(dur) = backoff.next() else { + warn!("Timed out waiting for {airdrop_id:?} to complete"); + ctx.stats.pending_mints.increment(); - debug_assert!(rx.is_empty(), "Trailing jobs in queue"); + return Ok(()); + }; - let Stats { - created_mints, - pending_mints, - failed_mints, - } = &*ctx.stats; - info!( - "Created: {:?} Pending: {:?} Failed: {:?}", - created_mints.load(Ordering::Relaxed), - pending_mints.load(Ordering::Relaxed), - failed_mints.load(Ordering::Relaxed), - ); + tokio::time::sleep(dur).await; - res - })?; + ctx.q + .send(Job::CheckStatus(CheckMintStatusJob { + airdrop_id, + mint_id: id, + backoff, + })) + .context("Error submitting mint status check job")?; + }, + _ => { + let Some(dur) = backoff.next() else { + ctx.stats.failed_mints.increment(); + bail!("Mint {mint_id:?} for {airdrop_id:?} failed too many times"); + }; + + warn!("Mint {mint_id:?} failed. Retrying..."); + tokio::time::sleep(dur).await; + + cache + .set(airdrop_id, MintRandomQueued { + mint_id: id.to_string(), + mint_address: None, + status: CreationStatus::Failed.into(), + }) + .await + .context("Error submitting mint retry job")?; + }, + } - Ok(()) + Ok(()) + }) + } } diff --git a/src/commands/upload_drop.rs b/src/commands/upload_drop.rs index e7b7609..dd957db 100644 --- a/src/commands/upload_drop.rs +++ b/src/commands/upload_drop.rs @@ -6,22 +6,18 @@ use std::{ iter, num::NonZeroUsize, path::{Path, PathBuf}, - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, - }, + sync::Arc, }; -use anyhow::{bail, Context as _, Result}; +use anyhow::{Context as _, Result}; use crossbeam::{ channel::{self, Sender}, queue::ArrayQueue, }; use dashmap::DashMap; -use futures_util::FutureExt; use graphql_client::GraphQLQuery; use itertools::Either; -use log::{error, info, trace, warn}; +use log::{info, trace, warn}; use reqwest::multipart; use serde::{Deserialize, Serialize}; use tokio::task::JoinHandle; @@ -33,9 +29,10 @@ use crate::{ cli::UploadDrop, common::{ concurrent, - error::format_errors, + graphql::UUID, metadata_json::{self, MetadataJson}, - reqwest::ResponseExt, + reqwest::{ClientExt, ResponseExt}, + stats::Counter, tokio::runtime, toposort::{Dependencies, Dependency, PendingFail}, url_permissive::PermissiveUrl, @@ -51,9 +48,6 @@ struct UploadedAsset { url: Url, } -#[allow(clippy::upper_case_acronyms)] -type UUID = Uuid; - #[derive(GraphQLQuery)] #[graphql( schema_path = "src/queries/schema.graphql", @@ -141,15 +135,15 @@ impl From for queue_mint_to_drop::MetadataJsonFileInput { #[derive(Default)] struct Stats { - uploaded_assets: AtomicUsize, - queued_mints: AtomicUsize, + uploaded_assets: Counter, + queued_mints: Counter, } pub fn run(config: &Config, cache: CacheConfig, args: UploadDrop) -> Result<()> { let UploadDrop { + concurrency, drop_id, include_dirs, - jobs, input_dirs, } = args; let include_dirs: HashSet<_> = include_dirs.into_iter().collect(); @@ -209,35 +203,9 @@ pub fn run(config: &Config, cache: CacheConfig, args: UploadDrop) -> Result<()> stats: Arc::default(), }; - let mut any_errs = false; runtime()?.block_on(async move { - let res = concurrent::try_run( - jobs.into(), - |e| { - error!("{e:?}"); - any_errs = true; - }, - || { - let job = match rx.try_recv() { - Ok(j) => Some(j), - Err(channel::TryRecvError::Empty) => None, - Err(e) => return Err(e).context("Error getting job from queue"), - }; - - let Some(job) = job else { - return Ok(None); - }; - - trace!("Submitting job: {job:?}"); - - Ok(Some(job.run(ctx.clone()).map(|f| { - f.context("Worker task panicked").and_then(|r| r) - }))) - }, - ) - .await; - - debug_assert!(rx.is_empty(), "Trailing jobs in queue"); + let (res, any_errs) = + concurrent::try_run_channel(concurrency, rx, |j| j.run(ctx.clone())).await; let Stats { uploaded_assets, @@ -245,8 +213,8 @@ pub fn run(config: &Config, cache: CacheConfig, args: UploadDrop) -> Result<()> } = &*ctx.stats; info!( "Uploaded {assets} asset(s) and queued {mints} mint(s)", - assets = uploaded_assets.load(std::sync::atomic::Ordering::Relaxed), - mints = queued_mints.load(std::sync::atomic::Ordering::Relaxed) + assets = uploaded_assets.load(), + mints = queued_mints.load() ); if any_errs { @@ -257,9 +225,7 @@ pub fn run(config: &Config, cache: CacheConfig, args: UploadDrop) -> Result<()> } res - })?; - - Ok(()) + }) } #[derive(Clone)] @@ -306,6 +272,26 @@ impl Context { } } +// The cost of shuffling these around is probably less than the cost of allocation +#[allow(clippy::large_enum_variant)] +#[derive(Debug)] +enum Job { + ScanJson(ScanJsonJob), + UploadAsset(UploadAssetJob), + QueueJson(QueueJsonJob), +} + +impl Job { + #[inline] + fn run(self, ctx: Context) -> JoinHandle> { + match self { + Job::ScanJson(j) => j.run(ctx), + Job::UploadAsset(j) => j.run(ctx), + Job::QueueJson(j) => j.run(ctx), + } + } +} + #[derive(Debug)] struct ScanJsonJob { dir: PathBuf, @@ -468,14 +454,15 @@ impl UploadAssetJob { let ck = Checksum::read_rewind_async(&path, &mut file).await?; let cache: AssetUploadCache = ctx.cache(dir)?.get().await?; - let cached_url = cache - .get(path.clone(), ck) - .await? - .and_then(|AssetUpload { url }| { - Url::parse(&url) - .map_err(|e| warn!("Invalid cache URL {url:?}: {e}")) - .ok() - }); + let cached_url = + cache + .get_named(path.clone(), ck) + .await? + .and_then(|AssetUpload { url }| { + Url::parse(&url) + .map_err(|e| warn!("Invalid cache URL {url:?}: {e}")) + .ok() + }); let dest_url; if let Some(url) = cached_url { @@ -523,11 +510,11 @@ impl UploadAssetJob { .find(|u| u.name == name) .with_context(|| format!("Missing upload response data for {path:?}"))?; - ctx.stats.uploaded_assets.fetch_add(1, Ordering::Relaxed); + ctx.stats.uploaded_assets.increment(); info!("Successfully uploaded {path:?}"); cache - .set(path.clone(), ck, AssetUpload { + .set_named(path.clone(), ck, AssetUpload { url: upload.url.to_string(), }) .await @@ -632,7 +619,7 @@ impl QueueJsonJob { serde_json::to_string(&input).map_or_else(|e| e.to_string(), |j| j.to_string()) ); - let cached_mint = cache.get(path.clone(), ck).await?; + let cached_mint = cache.get_named(path.clone(), ck).await?; if let Some(mint) = cached_mint { trace!("Using cached mint {mint:?} for {path:?}"); @@ -649,53 +636,28 @@ impl QueueJsonJob { } } else { let res = ctx - .client - .post(ctx.graphql_endpoint) - .json(&QueueMintToDrop::build_query(input)) - .send() - .await - .error_for_hub_status(|| format!("queueMintToDrop mutation for {path:?}"))? - .json::::ResponseData>>() - .await - .with_context(|| { - format!("Error parsing queueMintToDrop mutation response for {path:?}") - })?; - - trace!("GraphQL response for {path:?}: {res:?}"); - - let collection_mint; - if let Some(data) = res.data { - format_errors(res.errors, (), |s| { - warn!( - "queueMintToDrop mutation for {path:?} returned one or more errors:{s}" - ); - }); + .client + .graphql::() + .post(ctx.graphql_endpoint, input, || { + format!("queueMintToDrop mutation for {path:?}") + }) + .await?; - let queue_mint_to_drop::ResponseData { - queue_mint_to_drop: - queue_mint_to_drop::QueueMintToDropQueueMintToDrop { - collection_mint: - queue_mint_to_drop::QueueMintToDropQueueMintToDropCollectionMint { - id, - }, - }, - } = data; - collection_mint = id; - - ctx.stats.queued_mints.fetch_add(1, Ordering::Relaxed); - info!("Mint successfully queued for {path:?}"); - } else { - format_errors(res.errors, Ok(()), |s| { - bail!( - "queueMintToDrop mutation for {path:?} returned one or more errors:{s}" - ) - })?; + let queue_mint_to_drop::ResponseData { + queue_mint_to_drop: + queue_mint_to_drop::QueueMintToDropQueueMintToDrop { + collection_mint: + queue_mint_to_drop::QueueMintToDropQueueMintToDropCollectionMint { + id: collection_mint, + }, + }, + } = res.data; - bail!("queueMintToDrop mutation for {path:?} returned no data"); - } + ctx.stats.queued_mints.increment(); + info!("Mint successfully queued for {path:?}"); cache - .set(path, ck, DropMint { + .set_named(path, ck, DropMint { collection_mint: collection_mint.to_bytes_le().into(), input_hash: input_ck.to_bytes().into(), }) @@ -714,23 +676,3 @@ impl PendingFail for QueueJsonJob { warn!("Skipping {:?} due to failed dependencies", self.path); } } - -// The cost of shuffling these around is probably less than the cost of allocation -#[allow(clippy::large_enum_variant)] -#[derive(Debug)] -enum Job { - ScanJson(ScanJsonJob), - UploadAsset(UploadAssetJob), - QueueJson(QueueJsonJob), -} - -impl Job { - #[inline] - fn run(self, ctx: Context) -> JoinHandle> { - match self { - Job::ScanJson(j) => j.run(ctx), - Job::UploadAsset(j) => j.run(ctx), - Job::QueueJson(j) => j.run(ctx), - } - } -} diff --git a/src/common/concurrent.rs b/src/common/concurrent.rs index c557a7c..6f34e9e 100644 --- a/src/common/concurrent.rs +++ b/src/common/concurrent.rs @@ -1,6 +1,10 @@ -use std::future::Future; +use std::{fmt, future::Future}; -use futures_util::{stream::FuturesUnordered, StreamExt}; +use anyhow::Context; +use crossbeam::channel; +use futures_util::{stream::FuturesUnordered, FutureExt, StreamExt}; + +use crate::cli::ConcurrencyOpts; pub async fn try_run< F: Future>, @@ -8,10 +12,12 @@ pub async fn try_run< G: FnMut() -> Result, E>, H: FnMut(E), >( - jobs: usize, + opts: ConcurrencyOpts, mut err: H, mut get_job: G, ) -> Result<(), E> { + let ConcurrencyOpts { jobs } = opts; + let jobs = jobs.into(); let mut futures = FuturesUnordered::new(); 'run: loop { @@ -37,3 +43,44 @@ pub async fn try_run< Ok(()) } + +#[inline] +pub async fn try_run_channel< + J: fmt::Debug, + F: FnMut(J) -> tokio::task::JoinHandle>, +>( + opts: ConcurrencyOpts, + rx: channel::Receiver, + mut run: F, +) -> (Result<(), anyhow::Error>, bool) { + let mut any_errs = false; + let res = try_run( + opts, + |e| { + log::error!("{e:?}"); + any_errs = true; + }, + || { + let job = match rx.try_recv() { + Ok(j) => Some(j), + Err(channel::TryRecvError::Empty) => None, + Err(e) => return Err(e).context("Error getting job from queue"), + }; + + let Some(job) = job else { + return Ok(None); + }; + + log::trace!("Submitting job: {job:?}"); + + Ok(Some(run(job).map(|f| { + f.context("Worker task panicked").and_then(|r| r) + }))) + }, + ) + .await; + + debug_assert!(rx.is_empty(), "Trailing jobs in queue"); + + (res, any_errs) +} diff --git a/src/common/error.rs b/src/common/error.rs deleted file mode 100644 index d34ff72..0000000 --- a/src/common/error.rs +++ /dev/null @@ -1,21 +0,0 @@ -use std::fmt::Write; - -pub(crate) fn format_errors( - errors: Option>, - ok: T, - f: impl FnOnce(String) -> T, -) -> T { - let mut errs = errors.into_iter().flatten().peekable(); - - if errs.peek().is_some() { - let mut s = String::new(); - - for err in errs { - write!(s, "\n {err}").unwrap(); - } - - f(s) - } else { - ok - } -} diff --git a/src/common/graphql.rs b/src/common/graphql.rs new file mode 100644 index 0000000..2fd03bc --- /dev/null +++ b/src/common/graphql.rs @@ -0,0 +1,63 @@ +use std::{ + collections::HashMap, + fmt::{self, Write}, +}; + +use anyhow::{anyhow, Result}; +use graphql_client::Response; + +#[allow(clippy::upper_case_acronyms)] +pub type UUID = uuid::Uuid; + +#[derive(Debug)] +pub struct CheckedResponse { + pub data: T, + pub extensions: Option>, +} + +pub fn result_for_errors D + Clone, D: fmt::Display>( + res: Response, + name: F, +) -> Result> { + log::trace!("GraphQL response for {}: {res:?}", (name.clone())()); + + let Response { + data, + errors, + extensions, + } = res; + + if let Some(data) = data { + format_errors(errors, (), |s| { + log::warn!("{} returned one or more errors:{s}", name()); + }); + + Ok(CheckedResponse { data, extensions }) + } else { + let n = name.clone(); + Err(format_errors(errors, None, |s| { + Some(anyhow!("{} returned one or more errors:{s}", n())) + }) + .unwrap_or_else(|| anyhow!("{} returned no data", name()))) + } +} + +pub fn format_errors( + errors: Option>, + ok: T, + f: impl FnOnce(String) -> T, +) -> T { + let mut errs = errors.into_iter().flatten().peekable(); + + if errs.peek().is_some() { + let mut s = String::new(); + + for err in errs { + write!(s, "\n {err}").unwrap(); + } + + f(s) + } else { + ok + } +} diff --git a/src/common/pubkey.rs b/src/common/pubkey.rs new file mode 100644 index 0000000..a1225d8 --- /dev/null +++ b/src/common/pubkey.rs @@ -0,0 +1,43 @@ +use std::{fmt, str::FromStr}; + +#[derive(Clone, Copy, PartialEq, Eq, Hash)] +pub struct Pubkey([u8; 32]); + +impl Pubkey { + #[inline] + pub fn to_bytes(self) -> [u8; 32] { self.0 } +} + +impl fmt::Debug for Pubkey { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "Pubkey({self})") } +} + +impl fmt::Display for Pubkey { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", bs58::encode(&self.0).into_string()) + } +} + +impl FromStr for Pubkey { + type Err = bs58::decode::Error; + + fn from_str(s: &str) -> Result { + let mut buf = [0_u8; 32]; + bs58::decode(&s).onto(&mut buf)?; + Ok(Self(buf)) + } +} + +impl TryFrom for Pubkey { + type Error = ::Err; + + #[inline] + fn try_from(value: String) -> Result { value.parse() } +} + +impl TryFrom<&str> for Pubkey { + type Error = ::Err; + + #[inline] + fn try_from(value: &str) -> Result { value.parse() } +} diff --git a/src/common/reqwest.rs b/src/common/reqwest.rs index 1777e4c..c2f770d 100644 --- a/src/common/reqwest.rs +++ b/src/common/reqwest.rs @@ -1,7 +1,56 @@ -use std::fmt; +use std::{fmt, marker::PhantomData}; use anyhow::{Context, Result}; -use reqwest::{Response, StatusCode}; +use graphql_client::{GraphQLQuery, Response as GraphQlResponse}; +use reqwest::{Client, Response, StatusCode}; + +use super::graphql; + +pub trait ClientExt { + fn graphql(&self) -> GraphQlClient; +} + +impl ClientExt for Client { + #[inline] + fn graphql(&self) -> GraphQlClient { GraphQlClient(self, PhantomData) } +} + +#[derive(Clone, Copy)] +#[repr(transparent)] +pub struct GraphQlClient<'a, Q>(&'a Client, PhantomData<&'a Q>); + +impl<'a, Q: GraphQLQuery> GraphQlClient<'a, Q> { + // Avoid using the async_trait Box hack by just wrapping Client in a newtype + pub async fn post_unchecked D + Clone, D: fmt::Display>( + self, + url: U, + vars: Q::Variables, + name: F, + ) -> Result> { + self.0 + .post(url) + .json(&Q::build_query(vars)) + .send() + .await + .error_for_hub_status(name.clone())? + .json::>() + .await + .with_context(|| format!("Error parsing JSON from {}", name())) + } + + #[inline] + pub async fn post D + Clone, D: fmt::Display>( + self, + url: U, + vars: Q::Variables, + name: F, + ) -> Result> + where + Q::ResponseData: fmt::Debug, + { + graphql::result_for_errors(self.post_unchecked(url, vars, name.clone()).await?, name) + } +} pub trait ResponseExt: Sized { fn error_for_hub_status D, D: fmt::Display>(self, name: F) -> Result; diff --git a/src/common/stats.rs b/src/common/stats.rs new file mode 100644 index 0000000..54fb46c --- /dev/null +++ b/src/common/stats.rs @@ -0,0 +1,10 @@ +use std::sync::atomic::{AtomicU64, Ordering}; + +#[derive(Default)] +pub struct Counter(AtomicU64); + +impl Counter { + pub fn increment(&self) { self.0.fetch_add(1, Ordering::AcqRel); } + + pub fn load(&self) -> u64 { self.0.load(Ordering::Acquire) } +} diff --git a/src/common/url_permissive.rs b/src/common/url_permissive.rs index 3e1901d..ed1dfdb 100644 --- a/src/common/url_permissive.rs +++ b/src/common/url_permissive.rs @@ -80,8 +80,7 @@ impl<'de> Deserialize<'de> for PermissiveUrl { fn visit_str(self, s: &str) -> Result where E: de::Error { PermissiveUrl::from_str(s).map_err(|err| { - let err_s = format!("{}", err); - de::Error::invalid_value(de::Unexpected::Str(s), &err_s.as_str()) + de::Error::invalid_value(de::Unexpected::Str(s), &err.to_string().as_str()) }) } } diff --git a/src/main.rs b/src/main.rs index 23556be..97684cf 100644 --- a/src/main.rs +++ b/src/main.rs @@ -23,9 +23,11 @@ mod commands { mod common { pub mod concurrent; - pub mod error; + pub mod graphql; pub mod metadata_json; + pub mod pubkey; pub mod reqwest; + pub mod stats; pub mod tokio; pub mod toposort; pub mod url_permissive; @@ -75,10 +77,10 @@ mod entry { match subcmd { Subcommand::Config(c) => config::run(&config(true)?, c), + Subcommand::Airdrop(a) => airdrop::run(&read(config)?, cache, a), Subcommand::Upload(u) => match u.subcmd { UploadSubcommand::Drop(d) => upload_drop::run(&read(config)?, cache, d), }, - Subcommand::Airdrop(a) => airdrop::run(&read(config)?, cache, a), } } } diff --git a/src/proto/mint-random-queued.proto b/src/proto/mint-random-queued.proto index 451db01..41a2de6 100644 --- a/src/proto/mint-random-queued.proto +++ b/src/proto/mint-random-queued.proto @@ -9,7 +9,7 @@ message MintRandomQueued { } enum CreationStatus { - Created = 0; - Failed = 1; - Pending = 2; -} \ No newline at end of file + CREATED = 0; + FAILED = 1; + PENDING = 2; +} From 7694e9cc07259c4979d33bdd7f7b8d7655ffd61c Mon Sep 17 00:00:00 2001 From: Abdul Basit Date: Thu, 5 Oct 2023 10:27:02 +0500 Subject: [PATCH 6/6] adjust warn msg for failed mint --- src/commands/airdrop.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/commands/airdrop.rs b/src/commands/airdrop.rs index 87c6547..af2008b 100644 --- a/src/commands/airdrop.rs +++ b/src/commands/airdrop.rs @@ -232,7 +232,7 @@ impl MintRandomQueuedJob { CreationStatus::Failed => { // TODO: retry here - warn!("Mint {:?} failed. Retrying...", r.mint_id); + warn!("Mint {:?} failed.", r.mint_id); }, CreationStatus::Pending => { info!("Mint {:?} is pending. Checking status again...", r.mint_id); @@ -378,7 +378,7 @@ impl CheckMintStatusJob { bail!("Mint {mint_id:?} for {airdrop_id:?} failed too many times"); }; - warn!("Mint {mint_id:?} failed. Retrying..."); + warn!("Mint {mint_id:?} failed."); tokio::time::sleep(dur).await; cache