restore: Parallelize and add progress bar

This commit is contained in:
Alexander Weiss 2022-05-21 03:13:59 +02:00
parent 6eb55bc39c
commit c4e1728246

View File

@ -4,7 +4,8 @@ use std::path::PathBuf;
use anyhow::{anyhow, Result};
use clap::Parser;
use derive_getters::Dissolve;
use futures::StreamExt;
use futures::{stream::FuturesUnordered, TryStreamExt};
use tokio::spawn;
use vlog::*;
use super::progress_counter;
@ -60,8 +61,7 @@ async fn allocate_and_collect(
let mut file_infos = FileInfos::new();
let mut node_streamer = NodeStreamer::new(index.clone(), tree).await?;
while let Some(item) = node_streamer.next().await {
let (path, node) = item?;
while let Some((path, node)) = node_streamer.try_next().await? {
match node.node_type() {
NodeType::Dir => {
if !opts.dry_run {
@ -92,20 +92,45 @@ async fn restore_contents(
opts: &Opts,
) -> Result<()> {
let (filenames, restore_info) = file_infos.dissolve();
v1!("processing blobs...");
let p = progress_counter();
p.set_length(restore_info.iter().map(|(_, blob)| blob.len() as u64).sum());
let stream = FuturesUnordered::new();
for (pack, blob) in restore_info {
for (bl, fls) in blob {
// read pack at blob_offset with length blob_length
let data = be
.read_encrypted_partial(FileType::Pack, &pack, bl.offset, bl.length)
.await?;
for fl in fls {
// save in file at file_start
if !opts.dry_run {
dest.write_at(&filenames[fl.file_idx], fl.file_start, &data);
let p = p.clone();
let be = be.clone();
let dest = dest.clone();
let dry_run = opts.dry_run;
let name_dests: Vec<_> = fls
.iter()
.map(|fl| (filenames[fl.file_idx].clone(), fl.file_start))
.collect();
// TODO: error handling!
stream.push(spawn(async move {
// read pack at blob_offset with length blob_length
let data = be
.read_encrypted_partial(FileType::Pack, &pack, bl.offset, bl.length)
.await
.unwrap();
if !dry_run {
// save into needed files in parallel
for (name, start) in name_dests {
dest.write_at(&name, start, &data);
}
}
}
p.inc(1);
}))
}
}
stream.try_collect().await?;
p.finish();
Ok(())
}
@ -117,8 +142,7 @@ async fn restore_metadata(
) -> Result<()> {
// walk over tree in repository and compare with tree in dest
let mut node_streamer = NodeStreamer::new(index, tree).await?;
while let Some(item) = node_streamer.next().await {
let (path, node) = item?;
while let Some((path, node)) = node_streamer.try_next().await? {
if !opts.dry_run {
if let NodeType::Symlink { linktarget } = node.node_type() {
dest.create_symlink(&path, linktarget);