Make restore sync

This commit is contained in:
Alexander Weiss 2022-10-23 08:14:20 +02:00
parent 181ab5da2f
commit de0bd189a8

View File

@ -7,10 +7,9 @@ use std::path::{Path, PathBuf};
use anyhow::{anyhow, bail, Result};
use clap::{AppSettings, Parser};
use derive_getters::Dissolve;
use futures::{stream::FuturesUnordered, TryStreamExt};
use ignore::{DirEntry, WalkBuilder};
use log::*;
use tokio::spawn;
use rayon::ThreadPoolBuilder;
use super::{bytes, progress_bytes, progress_counter, wait, warm_up, warm_up_command};
use crate::backend::{DecryptReadBackend, FileType, LocalBackend};
@ -75,7 +74,7 @@ pub(super) async fn execute(be: &(impl DecryptReadBackend + Unpin), opts: Opts)
let dest = LocalBackend::new(&opts.dest);
let p = progress_spinner("collecting file information...");
let file_infos = allocate_and_collect(&dest, index.clone(), tree, &opts).await?;
let file_infos = allocate_and_collect(&dest, index.clone(), tree, &opts)?;
p.finish();
info!("total restore size: {}", bytes(file_infos.total_size));
if file_infos.matched_size > 0 {
@ -98,13 +97,13 @@ pub(super) async fn execute(be: &(impl DecryptReadBackend + Unpin), opts: Opts)
}
wait(opts.warm_up_wait).await;
if !opts.dry_run {
restore_contents(be, &dest, file_infos).await?;
restore_contents(be, &dest, file_infos)?;
}
}
if !opts.dry_run {
let p = progress_spinner("setting metadata...");
restore_metadata(&dest, index, tree, &opts).await?;
restore_metadata(&dest, index, tree, &opts)?;
p.finish();
}
@ -113,7 +112,7 @@ pub(super) async fn execute(be: &(impl DecryptReadBackend + Unpin), opts: Opts)
}
/// collect restore information, scan existing files and allocate non-existing files
async fn allocate_and_collect(
fn allocate_and_collect(
dest: &LocalBackend,
index: impl IndexedBackend + Unpin,
tree: Id,
@ -272,7 +271,7 @@ async fn allocate_and_collect(
/// restore_contents restores all files contents as described by file_infos
/// using the ReadBackend be and writing them into the LocalBackend dest.
async fn restore_contents(
fn restore_contents(
be: &impl DecryptReadBackend,
dest: &LocalBackend,
file_infos: FileInfos,
@ -281,69 +280,67 @@ async fn restore_contents(
let p = progress_bytes("restoring file contents...");
p.set_length(total_size - matched_size);
let mut stream = FuturesUnordered::new();
const MAX_READER: usize = 20;
for (pack, blob) in restore_info {
for (bl, fls) in blob {
let p = p.clone();
let be = be.clone();
let dest = dest.clone();
let pool = ThreadPoolBuilder::new().num_threads(MAX_READER).build()?;
pool.in_place_scope(|s| {
for (pack, blob) in restore_info {
for (bl, fls) in blob {
let from_file = fls
.iter()
.find(|fl| fl.matches)
.map(|fl| (filenames[fl.file_idx].clone(), fl.file_start));
let from_file = fls
.iter()
.find(|fl| fl.matches)
.map(|fl| (filenames[fl.file_idx].clone(), fl.file_start));
let name_dests: Vec<_> = fls
.iter()
.filter(|fl| !fl.matches)
.map(|fl| (filenames[fl.file_idx].clone(), fl.file_start))
.collect();
let p = &p;
let name_dests: Vec<_> = fls
.iter()
.filter(|fl| !fl.matches)
.map(|fl| (filenames[fl.file_idx].clone(), fl.file_start))
.collect();
if !name_dests.is_empty() {
// TODO: error handling!
s.spawn(move |s1| {
let data = match from_file {
Some((filename, start)) => {
// read from existing file
dest.read_at(filename, start, bl.data_length()).unwrap()
}
None => {
// read pack at blob_offset with length blob_length
be.read_encrypted_partial(
FileType::Pack,
&pack,
false,
bl.offset,
bl.length,
bl.uncompressed_length,
)
.unwrap()
}
};
let size = bl.data_length();
if !name_dests.is_empty() {
while stream.len() > MAX_READER {
stream.try_next().await?;
// save into needed files in parallel
for (name, start) in name_dests {
let data = data.clone();
s1.spawn(move |_| {
dest.write_at(&name, start, &data).unwrap();
p.inc(size);
});
}
})
}
// TODO: error handling!
stream.push(spawn(async move {
let data = match from_file {
Some((filename, start)) => {
// read from existing file
dest.read_at(filename, start, bl.data_length()).unwrap()
}
None => {
// read pack at blob_offset with length blob_length
be.read_encrypted_partial(
FileType::Pack,
&pack,
false,
bl.offset,
bl.length,
bl.uncompressed_length,
)
.unwrap()
}
};
// save into needed files in parallel
for (name, start) in name_dests {
dest.write_at(&name, start, &data).unwrap();
p.inc(bl.data_length());
}
}))
}
}
}
});
stream.try_collect().await?;
p.finish();
Ok(())
}
async fn restore_metadata(
fn restore_metadata(
dest: &LocalBackend,
index: impl IndexedBackend + Unpin,
tree: Id,