Skip to content

Commit

Permalink
make consistency check async
Browse files Browse the repository at this point in the history
  • Loading branch information
syphar committed Jan 12, 2025
1 parent da1f235 commit ab5e2c1
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 87 deletions.
3 changes: 2 additions & 1 deletion src/bin/cratesfyi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,8 @@ impl DatabaseSubcommand {
Self::Limits { command } => command.handle_args(ctx)?,

Self::Synchronize { dry_run } => {
docs_rs::utils::consistency::run_check(&ctx, dry_run)?;
ctx.runtime()?
.block_on(docs_rs::utils::consistency::run_check(&ctx, dry_run))?;
}
}
Ok(())
Expand Down
28 changes: 18 additions & 10 deletions src/utils/consistency/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,23 +62,31 @@ pub(super) async fn load(conn: &mut sqlx::PgConnection, config: &Config) -> Resu
#[cfg(test)]
mod tests {
use super::*;
use crate::test::wrapper;
use crate::test::async_wrapper;

#[test]
fn test_load() {
wrapper(|env| {
env.build_queue().add_crate("queued", "0.0.1", 0, None)?;
env.fake_release().name("krate").version("0.0.2").create()?;
env.fake_release()
async_wrapper(|env| async move {
env.async_build_queue()
.await
.add_crate("queued", "0.0.1", 0, None)
.await?;
env.async_fake_release()
.await
.name("krate")
.version("0.0.2")
.create_async()
.await?;
env.async_fake_release()
.await
.name("krate")
.version("0.0.3")
.yanked(true)
.create()?;
.create_async()
.await?;

let result = env.runtime().block_on(async {
let mut conn = env.async_db().await.async_conn().await;
load(&mut conn, &env.config()).await
})?;
let mut conn = env.async_db().await.async_conn().await;
let result = load(&mut conn, &env.config()).await?;

assert_eq!(
result,
Expand Down
168 changes: 92 additions & 76 deletions src/utils/consistency/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{db::delete, Context};
use crate::{db::delete, utils::spawn_blocking, Context};
use anyhow::{Context as _, Result};
use itertools::Itertools;
use tracing::{info, warn};
Expand All @@ -24,24 +24,25 @@ const BUILD_PRIORITY: i32 = 15;
///
/// Even when activities fail, the command can just be re-run. While the diff calculation will
/// be repeated, we won't re-execute fixing activities.
pub fn run_check<C: Context>(ctx: &C, dry_run: bool) -> Result<()> {
pub async fn run_check<C: Context>(ctx: &C, dry_run: bool) -> Result<()> {
let index = ctx.index()?;

info!("Loading data from database...");
let db_data = ctx
.runtime()?
.block_on(async {
let mut conn = ctx.pool()?.get_async().await?;
db::load(&mut conn, &*ctx.config()?).await
})
let mut conn = ctx.async_pool().await?.get_async().await?;
let db_data = db::load(&mut conn, &*ctx.config()?)
.await
.context("Loading crate data from database for consistency check")?;

tracing::info!("Loading data from index...");
let index_data =
index::load(&index).context("Loading crate data from index for consistency check")?;
let index_data = spawn_blocking({
let index = index.clone();
move || index::load(&index)
})
.await
.context("Loading crate data from index for consistency check")?;

let diff = diff::calculate_diff(db_data.iter(), index_data.iter());
let result = handle_diff(ctx, diff.iter(), dry_run)?;
let result = handle_diff(ctx, diff.iter(), dry_run).await?;

println!("============");
println!("SUMMARY");
Expand Down Expand Up @@ -79,29 +80,27 @@ struct HandleResult {
yanks_corrected: u32,
}

fn handle_diff<'a, I, C>(ctx: &C, iter: I, dry_run: bool) -> Result<HandleResult>
async fn handle_diff<'a, I, C>(ctx: &C, iter: I, dry_run: bool) -> Result<HandleResult>
where
I: Iterator<Item = &'a diff::Difference>,
C: Context,
{
let mut result = HandleResult::default();

let config = ctx.config()?;
let runtime = ctx.runtime()?;

let storage = runtime.block_on(ctx.async_storage())?;
let build_queue = ctx.build_queue()?;
let storage = ctx.async_storage().await?;
let build_queue = ctx.async_build_queue().await?;

let mut conn = runtime.block_on(ctx.pool()?.get_async())?;
let mut conn = ctx.async_pool().await?.get_async().await?;

for difference in iter {
println!("{difference}");

match difference {
diff::Difference::CrateNotInIndex(name) => {
if !dry_run {
if let Err(err) =
runtime.block_on(delete::delete_crate(&mut conn, &storage, &config, name))
if let Err(err) = delete::delete_crate(&mut conn, &storage, &config, name).await
{
warn!("{:?}", err);
}
Expand All @@ -111,7 +110,9 @@ where
diff::Difference::CrateNotInDb(name, versions) => {
for version in versions {
if !dry_run {
if let Err(err) = build_queue.add_crate(name, version, BUILD_PRIORITY, None)
if let Err(err) = build_queue
.add_crate(name, version, BUILD_PRIORITY, None)
.await
{
warn!("{:?}", err);
}
Expand All @@ -121,25 +122,28 @@ where
}
diff::Difference::ReleaseNotInIndex(name, version) => {
if !dry_run {
if let Err(err) = runtime.block_on(delete::delete_version(
&mut conn, &storage, &config, name, version,
)) {
if let Err(err) =
delete::delete_version(&mut conn, &storage, &config, name, version).await
{
warn!("{:?}", err);
}
}
result.releases_deleted += 1;
}
diff::Difference::ReleaseNotInDb(name, version) => {
if !dry_run {
if let Err(err) = build_queue.add_crate(name, version, BUILD_PRIORITY, None) {
if let Err(err) = build_queue
.add_crate(name, version, BUILD_PRIORITY, None)
.await
{
warn!("{:?}", err);
}
}
result.builds_queued += 1;
}
diff::Difference::ReleaseYank(name, version, yanked) => {
if !dry_run {
if let Err(err) = build_queue.set_yanked(name, version, *yanked) {
if let Err(err) = build_queue.set_yanked(name, version, *yanked).await {
warn!("{:?}", err);
}
}
Expand All @@ -155,57 +159,55 @@ where
mod tests {
use super::diff::Difference;
use super::*;
use crate::test::{wrapper, TestEnvironment};
use crate::test::{async_wrapper, TestEnvironment};
use sqlx::Row as _;

fn count(env: &TestEnvironment, sql: &str) -> Result<i64> {
Ok(env.runtime().block_on(async {
let mut conn = env.async_db().await.async_conn().await;
sqlx::query_scalar(sql).fetch_one(&mut *conn).await
})?)
async fn count(env: &TestEnvironment, sql: &str) -> Result<i64> {
let mut conn = env.async_db().await.async_conn().await;
Ok(sqlx::query_scalar(sql).fetch_one(&mut *conn).await?)
}

fn single_row<O>(env: &TestEnvironment, sql: &str) -> Result<Vec<O>>
async fn single_row<O>(env: &TestEnvironment, sql: &str) -> Result<Vec<O>>
where
O: Send + Unpin + for<'r> sqlx::Decode<'r, sqlx::Postgres> + sqlx::Type<sqlx::Postgres>,
{
env.runtime().block_on(async {
let mut conn = env.async_db().await.async_conn().await;
Ok::<_, anyhow::Error>(
sqlx::query(sql)
.fetch_all(&mut *conn)
.await?
.into_iter()
.map(|row| row.get(0))
.collect(),
)
})
let mut conn = env.async_db().await.async_conn().await;
Ok::<_, anyhow::Error>(
sqlx::query(sql)
.fetch_all(&mut *conn)
.await?
.into_iter()
.map(|row| row.get(0))
.collect(),
)
}

#[test]
fn test_delete_crate() {
wrapper(|env| {
env.fake_release()
async_wrapper(|env| async move {
env.async_fake_release()
.await
.name("krate")
.version("0.1.1")
.version("0.1.2")
.create()?;
.create_async()
.await?;

let diff = [Difference::CrateNotInIndex("krate".into())];

// calling with dry-run leads to no change
handle_diff(env, diff.iter(), true)?;
handle_diff(&*env, diff.iter(), true).await?;

assert_eq!(
count(env, "SELECT count(*) FROM crates WHERE name = 'krate'")?,
count(&env, "SELECT count(*) FROM crates WHERE name = 'krate'").await?,
1
);

// without dry-run the crate will be deleted
handle_diff(env, diff.iter(), false)?;
handle_diff(&*env, diff.iter(), false).await?;

assert_eq!(
count(env, "SELECT count(*) FROM crates WHERE name = 'krate'")?,
count(&env, "SELECT count(*) FROM crates WHERE name = 'krate'").await?,
0
);

Expand All @@ -215,25 +217,35 @@ mod tests {

#[test]
fn test_delete_release() {
wrapper(|env| {
env.fake_release().name("krate").version("0.1.1").create()?;
env.fake_release().name("krate").version("0.1.2").create()?;
async_wrapper(|env| async move {
env.async_fake_release()
.await
.name("krate")
.version("0.1.1")
.create_async()
.await?;
env.async_fake_release()
.await
.name("krate")
.version("0.1.2")
.create_async()
.await?;

let diff = [Difference::ReleaseNotInIndex(
"krate".into(),
"0.1.1".into(),
)];

assert_eq!(count(env, "SELECT count(*) FROM releases")?, 2);
assert_eq!(count(&env, "SELECT count(*) FROM releases").await?, 2);

handle_diff(env, diff.iter(), true)?;
handle_diff(&*env, diff.iter(), true).await?;

assert_eq!(count(env, "SELECT count(*) FROM releases")?, 2);
assert_eq!(count(&env, "SELECT count(*) FROM releases").await?, 2);

handle_diff(env, diff.iter(), false)?;
handle_diff(&*env, diff.iter(), false).await?;

assert_eq!(
single_row::<String>(env, "SELECT version FROM releases")?,
single_row::<String>(&env, "SELECT version FROM releases").await?,
vec!["0.1.2"]
);

Expand All @@ -243,30 +255,32 @@ mod tests {

#[test]
fn test_wrong_yank() {
wrapper(|env| {
env.fake_release()
async_wrapper(|env| async move {
env.async_fake_release()
.await
.name("krate")
.version("0.1.1")
.yanked(true)
.create()?;
.create_async()
.await?;

let diff = [Difference::ReleaseYank(
"krate".into(),
"0.1.1".into(),
false,
)];

handle_diff(env, diff.iter(), true)?;
handle_diff(&*env, diff.iter(), true).await?;

assert_eq!(
single_row::<bool>(env, "SELECT yanked FROM releases")?,
single_row::<bool>(&env, "SELECT yanked FROM releases").await?,
vec![true]
);

handle_diff(env, diff.iter(), false)?;
handle_diff(&*env, diff.iter(), false).await?;

assert_eq!(
single_row::<bool>(env, "SELECT yanked FROM releases")?,
single_row::<bool>(&env, "SELECT yanked FROM releases").await?,
vec![false]
);

Expand All @@ -276,20 +290,21 @@ mod tests {

#[test]
fn test_missing_release_in_db() {
wrapper(|env| {
async_wrapper(|env| async move {
let diff = [Difference::ReleaseNotInDb("krate".into(), "0.1.1".into())];

handle_diff(env, diff.iter(), true)?;
handle_diff(&*env, diff.iter(), true).await?;

let build_queue = env.build_queue();
let build_queue = env.async_build_queue().await;

assert!(build_queue.queued_crates()?.is_empty());
assert!(build_queue.queued_crates().await?.is_empty());

handle_diff(env, diff.iter(), false)?;
handle_diff(&*env, diff.iter(), false).await?;

assert_eq!(
build_queue
.queued_crates()?
.queued_crates()
.await?
.iter()
.map(|c| (c.name.as_str(), c.version.as_str(), c.priority))
.collect::<Vec<_>>(),
Expand All @@ -301,23 +316,24 @@ mod tests {

#[test]
fn test_missing_crate_in_db() {
wrapper(|env| {
async_wrapper(|env| async move {
let diff = [Difference::CrateNotInDb(
"krate".into(),
vec!["0.1.1".into(), "0.1.2".into()],
)];

handle_diff(env, diff.iter(), true)?;
handle_diff(&*env, diff.iter(), true).await?;

let build_queue = env.build_queue();
let build_queue = env.async_build_queue().await;

assert!(build_queue.queued_crates()?.is_empty());
assert!(build_queue.queued_crates().await?.is_empty());

handle_diff(env, diff.iter(), false)?;
handle_diff(&*env, diff.iter(), false).await?;

assert_eq!(
build_queue
.queued_crates()?
.queued_crates()
.await?
.iter()
.map(|c| (c.name.as_str(), c.version.as_str(), c.priority))
.collect::<Vec<_>>(),
Expand Down

0 comments on commit ab5e2c1

Please sign in to comment.