Limit concurrent operations

This commit is contained in:
Alexander Weiss 2022-07-12 10:22:05 +02:00
parent 68cd21a2af
commit b0db97854e
3 changed files with 49 additions and 51 deletions

View File

@ -2,7 +2,7 @@ use std::fs::File;
use anyhow::{bail, Result};
use async_trait::async_trait;
use futures::{stream::FuturesUnordered, TryStreamExt};
use futures::{stream, stream::FuturesUnordered, StreamExt};
use indicatif::ProgressBar;
use tokio::{spawn, task::JoinHandle};
use zstd::stream::{copy_encode, decode_all};
@ -74,36 +74,33 @@ pub trait DecryptWriteBackend: WriteBackend {
async fn save_list<F: RepoFile>(&self, list: Vec<F>, p: ProgressBar) -> Result<()> {
p.set_length(list.len() as u64);
list.into_iter()
.map(|file| {
let be = self.clone();
let p = p.clone();
spawn(async move {
be.save_file(&file).await.unwrap();
p.inc(1);
})
})
.collect::<FuturesUnordered<_>>()
.try_collect()
.await?;
stream::iter(list.into_iter().map(|file| {
let be = self.clone();
let p = p.clone();
(file, be, p)
}))
.for_each_concurrent(5, |(file, be, p)| async move {
be.save_file(&file).await.unwrap();
p.inc(1);
})
.await;
p.finish();
Ok(())
}
async fn delete_list(&self, tpe: FileType, list: Vec<Id>, p: ProgressBar) -> Result<()> {
p.set_length(list.len() as u64);
list.into_iter()
.map(|id| {
let be = self.clone();
let p = p.clone();
spawn(async move {
be.remove(tpe, &id).await.unwrap();
p.inc(1);
})
})
.collect::<FuturesUnordered<_>>()
.try_collect()
.await?;
stream::iter(list.into_iter().map(|id| {
let be = self.clone();
let p = p.clone();
(id, be, p)
}))
.for_each_concurrent(20, |(id, be, p)| async move {
be.remove(tpe, &id).await.unwrap();
p.inc(1);
})
.await;
p.finish();
Ok(())
}

View File

@ -2,8 +2,7 @@ use std::collections::HashMap;
use anyhow::Result;
use clap::Parser;
use futures::{stream::FuturesUnordered, TryStreamExt};
use tokio::spawn;
use futures::{stream, StreamExt, TryStreamExt};
use vlog::*;
use super::{progress_bytes, progress_counter};
@ -73,29 +72,26 @@ async fn check_cache_files(
let total_size = files.iter().map(|(_, size)| *size as u64).sum();
p.set_length(total_size);
let stream: FuturesUnordered<_> = files
.into_iter()
.map(|(id, size)| {
let cache = cache.clone();
let be = be.clone();
let p = p.clone();
spawn(async move {
// Read file from cache and from backend and compare
// TODO: Use (Async)Readers and compare using them!
let data_cached = cache.read_full(file_type, &id).await.unwrap();
let data = be.read_full(file_type, &id).await.unwrap();
if data_cached != data {
eprintln!(
"Cached file Type: {:?}, Id: {} is not identical to backend!",
file_type, id
);
}
p.inc(size as u64);
})
})
.collect();
stream.try_collect().await?;
stream::iter(files.into_iter().map(|file| {
let cache = cache.clone();
let be = be.clone();
let p = p.clone();
(file, cache, be, p)
}))
.for_each_concurrent(5, |((id, size), cache, be, p)| async move {
// Read file from cache and from backend and compare
// TODO: Use (Async)Readers and compare using them!
let data_cached = cache.read_full(file_type, &id).await.unwrap();
let data = be.read_full(file_type, &id).await.unwrap();
if data_cached != data {
eprintln!(
"Cached file Type: {:?}, Id: {} is not identical to backend!",
file_type, id
);
}
p.inc(size as u64);
})
.await;
p.finish();
Ok(())

View File

@ -101,8 +101,9 @@ async fn restore_contents(
v1!("processing blobs...");
let p = progress_counter();
p.set_length(restore_info.iter().map(|(_, blob)| blob.len() as u64).sum());
let stream = FuturesUnordered::new();
let mut stream = FuturesUnordered::new();
const MAX_READER: usize = 20;
for (pack, blob) in restore_info {
for (bl, fls) in blob {
let p = p.clone();
@ -114,6 +115,10 @@ async fn restore_contents(
.map(|fl| (filenames[fl.file_idx].clone(), fl.file_start))
.collect();
while stream.len() > MAX_READER {
stream.try_next().await?;
}
// TODO: error handling!
stream.push(spawn(async move {
// read pack at blob_offset with length blob_length