Merge pull request #90 from rustic-rs/restore-use-existing

restore: use existing fileparts
This commit is contained in:
aawsome 2022-07-29 10:41:11 +02:00 committed by GitHub
commit 65a410efbb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 113 additions and 41 deletions

View File

@ -289,6 +289,29 @@ impl LocalBackend {
Ok(())
}
pub fn read_at(&self, item: impl AsRef<Path>, offset: u64, length: u64) -> Result<Vec<u8>> {
let filename = self.path.join(item);
let mut file = File::open(&filename)?;
file.seek(SeekFrom::Start(offset))?;
let mut vec = vec![0; length.try_into().unwrap()];
file.read_exact(&mut vec).unwrap();
Ok(vec)
}
pub fn get_matching_file(&self, item: impl AsRef<Path>, size: u64) -> Option<File> {
let filename = self.path.join(item);
match fs::symlink_metadata(&filename) {
Ok(meta) => {
if meta.is_file() && meta.len() == size {
File::open(&filename).ok()
} else {
None
}
}
Err(_) => None,
}
}
pub fn write_at(&self, item: impl AsRef<Path>, offset: u64, data: &[u8]) -> Result<()> {
let filename = self.path.join(item);
let file = fs::OpenOptions::new()

View File

@ -1,4 +1,5 @@
use std::collections::HashMap;
use std::io::Read;
use std::num::NonZeroU32;
use std::path::PathBuf;
@ -9,9 +10,10 @@ use futures::{stream::FuturesUnordered, TryStreamExt};
use tokio::spawn;
use vlog::*;
use super::{progress_bytes, progress_counter};
use super::{bytes, progress_bytes, progress_counter};
use crate::backend::{DecryptReadBackend, FileType, LocalBackend};
use crate::blob::{Node, NodeStreamer, NodeType};
use crate::crypto::hash;
use crate::id::Id;
use crate::index::{IndexBackend, IndexedBackend};
use crate::repo::SnapshotFile;
@ -43,11 +45,22 @@ pub(super) async fn execute(be: &(impl DecryptReadBackend + Unpin), opts: Opts)
let dest = LocalBackend::new(&opts.dest);
let index = IndexBackend::new(be, progress_counter()).await?;
v1!("allocating dirs/files and collecting restore information...");
v1!("collecting restore information and allocating non-existing files...");
let file_infos = allocate_and_collect(&dest, index.clone(), snap.tree, &opts).await?;
v1!("total restore size: {}", bytes(file_infos.total_size));
if file_infos.matched_size > 0 {
v1!(
"using {} of existing file contents.",
bytes(file_infos.matched_size)
);
}
v1!("restoring file contents...");
restore_contents(be, &dest, file_infos, &opts).await?;
if file_infos.total_size == file_infos.matched_size {
v1!("all file contents are fine.");
} else {
v1!("restoring missing file contents...");
restore_contents(be, &dest, file_infos, &opts).await?;
}
v1!("setting metadata...");
restore_metadata(&dest, index, snap.tree, &opts).await?;
@ -56,7 +69,7 @@ pub(super) async fn execute(be: &(impl DecryptReadBackend + Unpin), opts: Opts)
Ok(())
}
/// allocate files, scan or remove existing files and collect restore information
/// collect restore information, scan existing files and allocate non-existing files
async fn allocate_and_collect(
dest: &LocalBackend,
index: impl IndexedBackend + Unpin,
@ -76,10 +89,11 @@ async fn allocate_and_collect(
}
NodeType::File => {
// collect blobs needed for restoring
let size = file_infos.add_file(&node, path.clone(), &index)?;
// create the file
if !opts.dry_run {
dest.create_file(&path, size)?;
if let Some(size) = file_infos.add_file(dest, &node, path.clone(), &index)? {
if !opts.dry_run {
// create the file if it doesn't exist with right size
dest.create_file(&path, size)?;
}
}
}
_ => {} // nothing to do for symlink, device, etc.
@ -97,17 +111,10 @@ async fn restore_contents(
file_infos: FileInfos,
opts: &Opts,
) -> Result<()> {
let (filenames, restore_info) = file_infos.dissolve();
let (filenames, restore_info, total_size, matched_size) = file_infos.dissolve();
v1!("processing blobs...");
let p = progress_bytes();
p.set_length(
restore_info
.iter()
.flat_map(|(_, blob)| blob)
.map(|(bl, fl)| bl.data_length() * fl.len() as u64)
.sum(),
);
p.set_length(total_size - matched_size);
let mut stream = FuturesUnordered::new();
const MAX_READER: usize = 20;
@ -116,39 +123,52 @@ async fn restore_contents(
let p = p.clone();
let be = be.clone();
let dest = dest.clone();
let dry_run = opts.dry_run;
let from_file = fls
.iter()
.find(|fl| fl.matches)
.map(|fl| (filenames[fl.file_idx].clone(), fl.file_start));
let name_dests: Vec<_> = fls
.iter()
.filter(|fl| !fl.matches)
.map(|fl| (filenames[fl.file_idx].clone(), fl.file_start))
.collect();
while stream.len() > MAX_READER {
stream.try_next().await?;
}
if !opts.dry_run & !name_dests.is_empty() {
while stream.len() > MAX_READER {
stream.try_next().await?;
}
// TODO: error handling!
stream.push(spawn(async move {
// read pack at blob_offset with length blob_length
let data = be
.read_encrypted_partial(
FileType::Pack,
&pack,
false,
bl.offset,
bl.length,
bl.uncompressed_length,
)
.await
.unwrap();
// TODO: error handling!
stream.push(spawn(async move {
let data = match from_file {
Some((filename, start)) => {
// read from existing file
dest.read_at(filename, start, bl.data_length()).unwrap()
}
None => {
// read pack at blob_offset with length blob_length
be.read_encrypted_partial(
FileType::Pack,
&pack,
false,
bl.offset,
bl.length,
bl.uncompressed_length,
)
.await
.unwrap()
}
};
if !dry_run {
// save into needed files in parallel
for (name, start) in name_dests {
dest.write_at(&name, start, &data).unwrap();
p.inc(bl.data_length());
}
}
}))
}))
}
}
}
@ -221,6 +241,8 @@ fn set_metadata(dest: &LocalBackend, path: &PathBuf, node: &Node, opts: &Opts) {
struct FileInfos {
names: Filenames,
r: RestoreInfo,
total_size: u64,
matched_size: u64,
}
type RestoreInfo = HashMap<Id, HashMap<BlobLocation, Vec<FileLocation>>>;
@ -247,6 +269,7 @@ impl BlobLocation {
struct FileLocation {
file_idx: usize,
file_start: u64,
matches: bool, //indicates that the file exists and these contents are already correct
}
impl FileInfos {
@ -254,12 +277,21 @@ impl FileInfos {
Self {
names: Vec::new(),
r: HashMap::new(),
total_size: 0,
matched_size: 0,
}
}
/// Add the file to FilesInfos using index to get blob information.
/// Returns the computed length of the file
fn add_file(&mut self, file: &Node, name: PathBuf, index: &impl IndexedBackend) -> Result<u64> {
fn add_file(
&mut self,
dest: &LocalBackend,
file: &Node,
name: PathBuf,
index: &impl IndexedBackend,
) -> Result<Option<u64>> {
let mut open_file = dest.get_matching_file(&name, *file.meta().size());
let mut file_pos = 0;
if !file.content().is_empty() {
let file_idx = self.names.len();
@ -274,16 +306,33 @@ impl FileInfos {
uncompressed_length: *ie.uncompressed_length(),
};
let matches = match &mut open_file {
Some(file) => {
// Existing file content; check if SHA256 matches
let mut vec = vec![0; ie.data_length() as usize];
file.read_exact(&mut vec).is_ok() && id == &hash(&vec)
}
None => false,
};
let length = bl.data_length();
self.total_size += length;
if matches {
self.matched_size += length;
}
let pack = self.r.entry(*ie.pack()).or_insert_with(HashMap::new);
let blob_location = pack.entry(bl).or_insert_with(Vec::new);
blob_location.push(FileLocation {
file_idx,
file_start: file_pos,
matches,
});
file_pos += ie.data_length() as u64;
}
}
Ok(file_pos)
// Tell to allocate the size only if the file does NOT exist with matching size
Ok(open_file.is_none().then(|| file_pos))
}
}