diff --git a/src/commands/restore.rs b/src/commands/restore.rs index fff8f77..432918c 100644 --- a/src/commands/restore.rs +++ b/src/commands/restore.rs @@ -7,10 +7,9 @@ use std::path::{Path, PathBuf}; use anyhow::{anyhow, bail, Result}; use clap::{AppSettings, Parser}; use derive_getters::Dissolve; -use futures::{stream::FuturesUnordered, TryStreamExt}; use ignore::{DirEntry, WalkBuilder}; use log::*; -use tokio::spawn; +use rayon::ThreadPoolBuilder; use super::{bytes, progress_bytes, progress_counter, wait, warm_up, warm_up_command}; use crate::backend::{DecryptReadBackend, FileType, LocalBackend}; @@ -75,7 +74,7 @@ pub(super) async fn execute(be: &(impl DecryptReadBackend + Unpin), opts: Opts) let dest = LocalBackend::new(&opts.dest); let p = progress_spinner("collecting file information..."); - let file_infos = allocate_and_collect(&dest, index.clone(), tree, &opts).await?; + let file_infos = allocate_and_collect(&dest, index.clone(), tree, &opts)?; p.finish(); info!("total restore size: {}", bytes(file_infos.total_size)); if file_infos.matched_size > 0 { @@ -98,13 +97,13 @@ pub(super) async fn execute(be: &(impl DecryptReadBackend + Unpin), opts: Opts) } wait(opts.warm_up_wait).await; if !opts.dry_run { - restore_contents(be, &dest, file_infos).await?; + restore_contents(be, &dest, file_infos)?; } } if !opts.dry_run { let p = progress_spinner("setting metadata..."); - restore_metadata(&dest, index, tree, &opts).await?; + restore_metadata(&dest, index, tree, &opts)?; p.finish(); } @@ -113,7 +112,7 @@ pub(super) async fn execute(be: &(impl DecryptReadBackend + Unpin), opts: Opts) } /// collect restore information, scan existing files and allocate non-existing files -async fn allocate_and_collect( +fn allocate_and_collect( dest: &LocalBackend, index: impl IndexedBackend + Unpin, tree: Id, @@ -272,7 +271,7 @@ async fn allocate_and_collect( /// restore_contents restores all files contents as described by file_infos /// using the ReadBackend be and writing them into the LocalBackend dest. -async fn restore_contents( +fn restore_contents( be: &impl DecryptReadBackend, dest: &LocalBackend, file_infos: FileInfos, @@ -281,69 +280,67 @@ async fn restore_contents( let p = progress_bytes("restoring file contents..."); p.set_length(total_size - matched_size); - let mut stream = FuturesUnordered::new(); const MAX_READER: usize = 20; - for (pack, blob) in restore_info { - for (bl, fls) in blob { - let p = p.clone(); - let be = be.clone(); - let dest = dest.clone(); + let pool = ThreadPoolBuilder::new().num_threads(MAX_READER).build()?; + pool.in_place_scope(|s| { + for (pack, blob) in restore_info { + for (bl, fls) in blob { + let from_file = fls + .iter() + .find(|fl| fl.matches) + .map(|fl| (filenames[fl.file_idx].clone(), fl.file_start)); - let from_file = fls - .iter() - .find(|fl| fl.matches) - .map(|fl| (filenames[fl.file_idx].clone(), fl.file_start)); + let name_dests: Vec<_> = fls + .iter() + .filter(|fl| !fl.matches) + .map(|fl| (filenames[fl.file_idx].clone(), fl.file_start)) + .collect(); + let p = &p; - let name_dests: Vec<_> = fls - .iter() - .filter(|fl| !fl.matches) - .map(|fl| (filenames[fl.file_idx].clone(), fl.file_start)) - .collect(); + if !name_dests.is_empty() { + // TODO: error handling! + s.spawn(move |s1| { + let data = match from_file { + Some((filename, start)) => { + // read from existing file + dest.read_at(filename, start, bl.data_length()).unwrap() + } + None => { + // read pack at blob_offset with length blob_length + be.read_encrypted_partial( + FileType::Pack, + &pack, + false, + bl.offset, + bl.length, + bl.uncompressed_length, + ) + .unwrap() + } + }; + let size = bl.data_length(); - if !name_dests.is_empty() { - while stream.len() > MAX_READER { - stream.try_next().await?; + // save into needed files in parallel + for (name, start) in name_dests { + let data = data.clone(); + s1.spawn(move |_| { + dest.write_at(&name, start, &data).unwrap(); + p.inc(size); + }); + } + }) } - - // TODO: error handling! - stream.push(spawn(async move { - let data = match from_file { - Some((filename, start)) => { - // read from existing file - dest.read_at(filename, start, bl.data_length()).unwrap() - } - None => { - // read pack at blob_offset with length blob_length - be.read_encrypted_partial( - FileType::Pack, - &pack, - false, - bl.offset, - bl.length, - bl.uncompressed_length, - ) - .unwrap() - } - }; - - // save into needed files in parallel - for (name, start) in name_dests { - dest.write_at(&name, start, &data).unwrap(); - p.inc(bl.data_length()); - } - })) } } - } + }); - stream.try_collect().await?; p.finish(); Ok(()) } -async fn restore_metadata( +fn restore_metadata( dest: &LocalBackend, index: impl IndexedBackend + Unpin, tree: Id,