From b0db97854e8ad78b544b2ff9e8d05b69b4f94f21 Mon Sep 17 00:00:00 2001 From: Alexander Weiss Date: Tue, 12 Jul 2022 10:22:05 +0200 Subject: [PATCH] Limit concurrent operations --- src/backend/decrypt.rs | 47 +++++++++++++++++++---------------------- src/commands/check.rs | 46 ++++++++++++++++++---------------------- src/commands/restore.rs | 7 +++++- 3 files changed, 49 insertions(+), 51 deletions(-) diff --git a/src/backend/decrypt.rs b/src/backend/decrypt.rs index b402b5c..3f996c7 100644 --- a/src/backend/decrypt.rs +++ b/src/backend/decrypt.rs @@ -2,7 +2,7 @@ use std::fs::File; use anyhow::{bail, Result}; use async_trait::async_trait; -use futures::{stream::FuturesUnordered, TryStreamExt}; +use futures::{stream, stream::FuturesUnordered, StreamExt}; use indicatif::ProgressBar; use tokio::{spawn, task::JoinHandle}; use zstd::stream::{copy_encode, decode_all}; @@ -74,36 +74,33 @@ pub trait DecryptWriteBackend: WriteBackend { async fn save_list(&self, list: Vec, p: ProgressBar) -> Result<()> { p.set_length(list.len() as u64); - list.into_iter() - .map(|file| { - let be = self.clone(); - let p = p.clone(); - spawn(async move { - be.save_file(&file).await.unwrap(); - p.inc(1); - }) - }) - .collect::>() - .try_collect() - .await?; + stream::iter(list.into_iter().map(|file| { + let be = self.clone(); + let p = p.clone(); + (file, be, p) + })) + .for_each_concurrent(5, |(file, be, p)| async move { + be.save_file(&file).await.unwrap(); + p.inc(1); + }) + .await; p.finish(); Ok(()) } async fn delete_list(&self, tpe: FileType, list: Vec, p: ProgressBar) -> Result<()> { p.set_length(list.len() as u64); - list.into_iter() - .map(|id| { - let be = self.clone(); - let p = p.clone(); - spawn(async move { - be.remove(tpe, &id).await.unwrap(); - p.inc(1); - }) - }) - .collect::>() - .try_collect() - .await?; + stream::iter(list.into_iter().map(|id| { + let be = self.clone(); + let p = p.clone(); + (id, be, p) + })) + .for_each_concurrent(20, |(id, be, p)| async move { + be.remove(tpe, &id).await.unwrap(); + p.inc(1); + }) + .await; + p.finish(); Ok(()) } diff --git a/src/commands/check.rs b/src/commands/check.rs index 40dd423..405bf91 100644 --- a/src/commands/check.rs +++ b/src/commands/check.rs @@ -2,8 +2,7 @@ use std::collections::HashMap; use anyhow::Result; use clap::Parser; -use futures::{stream::FuturesUnordered, TryStreamExt}; -use tokio::spawn; +use futures::{stream, StreamExt, TryStreamExt}; use vlog::*; use super::{progress_bytes, progress_counter}; @@ -73,29 +72,26 @@ async fn check_cache_files( let total_size = files.iter().map(|(_, size)| *size as u64).sum(); p.set_length(total_size); - let stream: FuturesUnordered<_> = files - .into_iter() - .map(|(id, size)| { - let cache = cache.clone(); - let be = be.clone(); - let p = p.clone(); - spawn(async move { - // Read file from cache and from backend and compare - // TODO: Use (Async)Readers and compare using them! - let data_cached = cache.read_full(file_type, &id).await.unwrap(); - let data = be.read_full(file_type, &id).await.unwrap(); - if data_cached != data { - eprintln!( - "Cached file Type: {:?}, Id: {} is not identical to backend!", - file_type, id - ); - } - p.inc(size as u64); - }) - }) - .collect(); - - stream.try_collect().await?; + stream::iter(files.into_iter().map(|file| { + let cache = cache.clone(); + let be = be.clone(); + let p = p.clone(); + (file, cache, be, p) + })) + .for_each_concurrent(5, |((id, size), cache, be, p)| async move { + // Read file from cache and from backend and compare + // TODO: Use (Async)Readers and compare using them! + let data_cached = cache.read_full(file_type, &id).await.unwrap(); + let data = be.read_full(file_type, &id).await.unwrap(); + if data_cached != data { + eprintln!( + "Cached file Type: {:?}, Id: {} is not identical to backend!", + file_type, id + ); + } + p.inc(size as u64); + }) + .await; p.finish(); Ok(()) diff --git a/src/commands/restore.rs b/src/commands/restore.rs index c41ce99..2387ac2 100644 --- a/src/commands/restore.rs +++ b/src/commands/restore.rs @@ -101,8 +101,9 @@ async fn restore_contents( v1!("processing blobs..."); let p = progress_counter(); p.set_length(restore_info.iter().map(|(_, blob)| blob.len() as u64).sum()); - let stream = FuturesUnordered::new(); + let mut stream = FuturesUnordered::new(); + const MAX_READER: usize = 20; for (pack, blob) in restore_info { for (bl, fls) in blob { let p = p.clone(); @@ -114,6 +115,10 @@ async fn restore_contents( .map(|fl| (filenames[fl.file_idx].clone(), fl.file_start)) .collect(); + while stream.len() > MAX_READER { + stream.try_next().await?; + } + // TODO: error handling! stream.push(spawn(async move { // read pack at blob_offset with length blob_length