Parallelize check

This commit is contained in:
Alexander Weiss 2022-10-17 15:41:17 +02:00
parent 683f2bc26c
commit d8665fe794

View File

@ -3,10 +3,10 @@ use std::collections::HashMap;
use anyhow::Result;
use bytes::Bytes;
use clap::Parser;
use futures::{stream, StreamExt, TryStreamExt};
use futures::TryStreamExt;
use indicatif::ProgressBar;
use log::*;
use tokio::task::spawn_blocking;
use rayon::prelude::*;
use zstd::stream::decode_all;
use super::{progress_bytes, progress_counter};
@ -49,7 +49,7 @@ pub(super) async fn execute(
let p = progress_bytes(format!("checking {} in cache...", file_type.name()));
// TODO: Make concurrency (20) customizable
check_cache_files(20, cache, raw_be, file_type, p).await?;
check_cache_files(20, cache, raw_be, file_type, p)?;
}
}
}
@ -66,7 +66,7 @@ pub(super) async fn execute(
if let Some(cache) = &cache {
let p = progress_bytes("checking packs in cache...");
// TODO: Make concurrency (5) customizable
check_cache_files(5, cache, raw_be, FileType::Pack, p).await?;
check_cache_files(5, cache, raw_be, FileType::Pack, p)?;
}
}
@ -76,26 +76,20 @@ pub(super) async fn execute(
if opts.read_data {
let p = progress_counter("reading pack data...");
stream::iter(index_be.into_index().into_iter().map(|pack| {
let be = be.clone();
let p = p.clone();
(pack, be, p)
}))
// TODO: Make concurrency (4) customizable
.for_each_concurrent(4, |(pack, be, p)| async move {
let id = pack.id;
let data = be.read_full(FileType::Pack, &id).unwrap();
spawn_blocking(move || {
match check_pack(&be, pack, data) {
index_be
.into_index()
.into_iter()
.par_bridge()
.for_each_with((be.clone(), p.clone()), |(be, p), pack| {
let id = pack.id;
let data = be.read_full(FileType::Pack, &id).unwrap();
match check_pack(be, pack, data) {
Ok(()) => {}
Err(err) => error!("Error reading pack {id} : {err}",),
}
p.inc(1);
})
.await
.unwrap()
})
.await;
});
p.finish();
}
@ -133,8 +127,8 @@ fn check_hot_files(
Ok(())
}
async fn check_cache_files(
concurrency: usize,
fn check_cache_files(
_concurrency: usize,
cache: &Cache,
be: &impl ReadBackend,
file_type: FileType,
@ -149,13 +143,8 @@ async fn check_cache_files(
let total_size = files.iter().map(|(_, size)| *size as u64).sum();
p.set_length(total_size);
stream::iter(files.into_iter().map(|file| {
let cache = cache.clone();
let be = be.clone();
let p = p.clone();
(file, cache, be, p)
}))
.for_each_concurrent(concurrency, |((id, size), cache, be, p)| async move {
files.into_par_iter()
.for_each_with((cache,be,p.clone()), |(cache, be, p),(id, size)| {
// Read file from cache and from backend and compare
match (
cache.read_full(file_type, &id),
@ -172,8 +161,7 @@ async fn check_cache_files(
}
p.inc(size as u64);
})
.await;
});
p.finish();
Ok(())