diff --git a/src/commands/restore.rs b/src/commands/restore.rs index b22f663..cc7103d 100644 --- a/src/commands/restore.rs +++ b/src/commands/restore.rs @@ -4,7 +4,8 @@ use std::path::PathBuf; use anyhow::{anyhow, Result}; use clap::Parser; use derive_getters::Dissolve; -use futures::StreamExt; +use futures::{stream::FuturesUnordered, TryStreamExt}; +use tokio::spawn; use vlog::*; use super::progress_counter; @@ -60,8 +61,7 @@ async fn allocate_and_collect( let mut file_infos = FileInfos::new(); let mut node_streamer = NodeStreamer::new(index.clone(), tree).await?; - while let Some(item) = node_streamer.next().await { - let (path, node) = item?; + while let Some((path, node)) = node_streamer.try_next().await? { match node.node_type() { NodeType::Dir => { if !opts.dry_run { @@ -92,20 +92,45 @@ async fn restore_contents( opts: &Opts, ) -> Result<()> { let (filenames, restore_info) = file_infos.dissolve(); + + v1!("processing blobs..."); + let p = progress_counter(); + p.set_length(restore_info.iter().map(|(_, blob)| blob.len() as u64).sum()); + let stream = FuturesUnordered::new(); + for (pack, blob) in restore_info { for (bl, fls) in blob { - // read pack at blob_offset with length blob_length - let data = be - .read_encrypted_partial(FileType::Pack, &pack, bl.offset, bl.length) - .await?; - for fl in fls { - // save in file at file_start - if !opts.dry_run { - dest.write_at(&filenames[fl.file_idx], fl.file_start, &data); + let p = p.clone(); + let be = be.clone(); + let dest = dest.clone(); + let dry_run = opts.dry_run; + let name_dests: Vec<_> = fls + .iter() + .map(|fl| (filenames[fl.file_idx].clone(), fl.file_start)) + .collect(); + + // TODO: error handling! + stream.push(spawn(async move { + // read pack at blob_offset with length blob_length + let data = be + .read_encrypted_partial(FileType::Pack, &pack, bl.offset, bl.length) + .await + .unwrap(); + + if !dry_run { + // save into needed files in parallel + for (name, start) in name_dests { + dest.write_at(&name, start, &data); + } } - } + p.inc(1); + })) } } + + stream.try_collect().await?; + p.finish(); + Ok(()) } @@ -117,8 +142,7 @@ async fn restore_metadata( ) -> Result<()> { // walk over tree in repository and compare with tree in dest let mut node_streamer = NodeStreamer::new(index, tree).await?; - while let Some(item) = node_streamer.next().await { - let (path, node) = item?; + while let Some((path, node)) = node_streamer.try_next().await? { if !opts.dry_run { if let NodeType::Symlink { linktarget } = node.node_type() { dest.create_symlink(&path, linktarget);