diff --git a/src/commands/check.rs b/src/commands/check.rs index 1dbf85b..1856dea 100644 --- a/src/commands/check.rs +++ b/src/commands/check.rs @@ -3,10 +3,10 @@ use std::collections::HashMap; use anyhow::Result; use bytes::Bytes; use clap::Parser; -use futures::{stream, StreamExt, TryStreamExt}; +use futures::TryStreamExt; use indicatif::ProgressBar; use log::*; -use tokio::task::spawn_blocking; +use rayon::prelude::*; use zstd::stream::decode_all; use super::{progress_bytes, progress_counter}; @@ -49,7 +49,7 @@ pub(super) async fn execute( let p = progress_bytes(format!("checking {} in cache...", file_type.name())); // TODO: Make concurrency (20) customizable - check_cache_files(20, cache, raw_be, file_type, p).await?; + check_cache_files(20, cache, raw_be, file_type, p)?; } } } @@ -66,7 +66,7 @@ pub(super) async fn execute( if let Some(cache) = &cache { let p = progress_bytes("checking packs in cache..."); // TODO: Make concurrency (5) customizable - check_cache_files(5, cache, raw_be, FileType::Pack, p).await?; + check_cache_files(5, cache, raw_be, FileType::Pack, p)?; } } @@ -76,26 +76,20 @@ pub(super) async fn execute( if opts.read_data { let p = progress_counter("reading pack data..."); - stream::iter(index_be.into_index().into_iter().map(|pack| { - let be = be.clone(); - let p = p.clone(); - (pack, be, p) - })) - // TODO: Make concurrency (4) customizable - .for_each_concurrent(4, |(pack, be, p)| async move { - let id = pack.id; - let data = be.read_full(FileType::Pack, &id).unwrap(); - spawn_blocking(move || { - match check_pack(&be, pack, data) { + + index_be + .into_index() + .into_iter() + .par_bridge() + .for_each_with((be.clone(), p.clone()), |(be, p), pack| { + let id = pack.id; + let data = be.read_full(FileType::Pack, &id).unwrap(); + match check_pack(be, pack, data) { Ok(()) => {} Err(err) => error!("Error reading pack {id} : {err}",), } p.inc(1); - }) - .await - .unwrap() - }) - .await; + }); p.finish(); } @@ -133,8 +127,8 @@ fn check_hot_files( Ok(()) } -async fn check_cache_files( - concurrency: usize, +fn check_cache_files( + _concurrency: usize, cache: &Cache, be: &impl ReadBackend, file_type: FileType, @@ -149,13 +143,8 @@ async fn check_cache_files( let total_size = files.iter().map(|(_, size)| *size as u64).sum(); p.set_length(total_size); - stream::iter(files.into_iter().map(|file| { - let cache = cache.clone(); - let be = be.clone(); - let p = p.clone(); - (file, cache, be, p) - })) - .for_each_concurrent(concurrency, |((id, size), cache, be, p)| async move { + files.into_par_iter() + .for_each_with((cache,be,p.clone()), |(cache, be, p),(id, size)| { // Read file from cache and from backend and compare match ( cache.read_full(file_type, &id), @@ -172,8 +161,7 @@ async fn check_cache_files( } p.inc(size as u64); - }) - .await; + }); p.finish(); Ok(())