Merge pull request #258 from rustic-rs/repair-snapshots

Add repair snapshots command
This commit is contained in:
aawsome 2022-10-14 23:26:38 +02:00 committed by GitHub
commit 7e9fe331e3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 276 additions and 14 deletions

12
Cargo.lock generated
View File

@ -85,6 +85,17 @@ version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfb6d71005dc22a708c7496eee5c8dc0300ee47355de6256c3b35b12b5fef596"
[[package]]
name = "async-recursion"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2cda8f4bcc10624c4e85bc66b3f452cca98cfa5ca002dc83a16aad2367641bea"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "async-trait"
version = "0.1.57"
@ -1562,6 +1573,7 @@ dependencies = [
"aes256ctr_poly1305aes",
"ambassador",
"anyhow",
"async-recursion",
"async-trait",
"backoff",
"base64",

View File

@ -24,6 +24,7 @@ strip = true
[dependencies]
# macros
async-trait = "0.1"
async-recursion = "1"
anyhow = "1"
ambassador = "0.2"
thiserror = "1"

View File

@ -32,6 +32,7 @@ Improvements:
* Huge decrease in memory requirement
* Already faster than restic for most operations (but not yet fully speed optimized)
* Cleaner concent of logging output; posibility to write logs to a log file
* `rustic repair` command allows to repair some kinds of broken repositories
* `backup` command can use `.gitignore` files
* `restore` uses existing files; also option `--delete` available
* Snapshots save much more information, available in `snapshots` command

View File

@ -147,9 +147,7 @@ impl<BE: DecryptWriteBackend, I: IndexedBackend> Archiver<BE, I> {
pub async fn finish_trees(&mut self, path: &Path) -> Result<()> {
while !path.starts_with(&self.path) {
// save tree and go back to parent dir
let mut chunk = self.tree.serialize()?;
chunk.push(b'\n'); // for whatever reason, restic adds a newline, so to be compatible...
let id = hash(&chunk);
let (chunk, id) = self.tree.serialize()?;
let (mut node, tree, parent) = self
.stack
@ -286,8 +284,7 @@ impl<BE: DecryptWriteBackend, I: IndexedBackend> Archiver<BE, I> {
pub async fn finalize_snapshot(mut self) -> Result<SnapshotFile> {
self.finish_trees(&PathBuf::from("/")).await?;
let chunk = self.tree.serialize()?;
let id = hash(&chunk);
let (chunk, id) = self.tree.serialize()?;
if !self.index.has_tree(&id) {
self.tree_packer.add(&chunk, &id).await?;
}

View File

@ -14,6 +14,7 @@ use indicatif::ProgressBar;
use serde::{Deserialize, Deserializer, Serialize};
use tokio::{spawn, task::JoinHandle};
use crate::crypto::hash;
use crate::id::Id;
use crate::index::IndexedBackend;
@ -43,8 +44,11 @@ impl Tree {
self.nodes.push(node)
}
pub fn serialize(&self) -> Result<Vec<u8>> {
Ok(serde_json::to_vec(&self)?)
pub fn serialize(&self) -> Result<(Vec<u8>, Id)> {
let mut chunk = serde_json::to_vec(&self)?;
chunk.push(b'\n'); // for whatever reason, restic adds a newline, so to be compatible...
let id = hash(&chunk);
Ok((chunk, id))
}
pub async fn from_backend(be: &impl IndexedBackend, id: Id) -> Result<Self> {
@ -76,6 +80,15 @@ impl Tree {
}
}
impl IntoIterator for Tree {
type Item = Node;
type IntoIter = std::vec::IntoIter<Node>;
fn into_iter(self) -> Self::IntoIter {
self.nodes.into_iter()
}
}
/// NodeStreamer recursively streams all nodes of a given tree including all subtrees in-order
pub struct NodeStreamer<BE>
where

View File

@ -322,7 +322,7 @@ pub async fn execute() -> Result<()> {
Command::Snapshots(opts) => snapshots::execute(&dbe, opts, config_file).await?,
Command::Prune(opts) => prune::execute(&dbe, cache, opts, config, vec![]).await?,
Command::Restore(opts) => restore::execute(&dbe, opts).await?,
Command::Repair(opts) => repair::execute(&dbe, opts).await?,
Command::Repair(opts) => repair::execute(&dbe, opts, config_file, &config).await?,
Command::Repoinfo(opts) => repoinfo::execute(&dbe, &be_hot, opts).await?,
Command::Tag(opts) => tag::execute(&dbe, opts, config_file).await?,
};

View File

@ -1,13 +1,21 @@
use std::collections::{HashMap, HashSet};
use anyhow::Result;
use clap::{Parser, Subcommand};
use async_recursion::async_recursion;
use clap::{AppSettings, Parser, Subcommand};
use futures::TryStreamExt;
use log::*;
use std::collections::HashMap;
use crate::backend::{DecryptFullBackend, FileType};
use crate::index::Indexer;
use crate::repo::{IndexFile, IndexPack, PackHeader, PackHeaderRef};
use crate::backend::{DecryptFullBackend, DecryptWriteBackend, FileType};
use crate::blob::{BlobType, NodeType, Packer, Tree};
use crate::id::Id;
use crate::index::{IndexBackend, IndexedBackend, Indexer, ReadIndex};
use crate::repo::{
ConfigFile, IndexFile, IndexPack, PackHeader, PackHeaderRef, SnapshotFile, SnapshotFilter,
StringList,
};
use super::rustic_config::RusticConfig;
use super::{progress_counter, progress_spinner, wait, warm_up, warm_up_command};
#[derive(Parser)]
@ -20,6 +28,8 @@ pub(super) struct Opts {
enum Command {
/// Repair the repository index
Index(IndexOpts),
/// Repair snapshots
Snapshots(SnapOpts),
}
#[derive(Default, Parser)]
@ -45,9 +55,42 @@ struct IndexOpts {
warm_up_wait: Option<humantime::Duration>,
}
pub(super) async fn execute(be: &impl DecryptFullBackend, opts: Opts) -> Result<()> {
#[derive(Default, Parser)]
#[clap(global_setting(AppSettings::DeriveDisplayOrder))]
struct SnapOpts {
#[clap(flatten, help_heading = "SNAPSHOT FILTER OPTIONS")]
filter: SnapshotFilter,
/// Only show what would be repaired
#[clap(long, short = 'n')]
dry_run: bool,
/// Also remove defect snapshots - WARNING: This can result in data loss!
#[clap(long, short = 'n')]
delete: bool,
/// Append this suffix to repaired directory or file name
#[clap(long, value_name = "SUFFIX", default_value = ".repaired")]
suffix: String,
/// Tag list to set on repaired snapshots (can be specified multiple times)
#[clap(long, value_name = "TAG[,TAG,..]", default_value = "repaired")]
tag: Vec<StringList>,
/// Snapshots to repair. If none is given, use filter to filter from all snapshots.
#[clap(value_name = "ID")]
ids: Vec<String>,
}
pub(super) async fn execute(
be: &impl DecryptFullBackend,
opts: Opts,
config_file: RusticConfig,
config: &ConfigFile,
) -> Result<()> {
match opts.command {
Command::Index(opt) => repair_index(be, opt).await,
Command::Snapshots(opt) => repair_snaps(be, opt, config_file, config).await,
}
}
@ -167,3 +210,198 @@ async fn repair_index(be: &impl DecryptFullBackend, opts: IndexOpts) -> Result<(
Ok(())
}
async fn repair_snaps(
be: &impl DecryptFullBackend,
mut opts: SnapOpts,
config_file: RusticConfig,
config: &ConfigFile,
) -> Result<()> {
config_file.merge_into("snapshot-filter", &mut opts.filter)?;
let snapshots = match opts.ids.is_empty() {
true => SnapshotFile::all_from_backend(be, &opts.filter).await?,
false => SnapshotFile::from_ids(be, &opts.ids).await?,
};
let mut replaced = HashMap::new();
let mut seen = HashSet::new();
let mut delete = Vec::new();
let index = IndexBackend::new(&be.clone(), progress_counter("")).await?;
let indexer = Indexer::new(be.clone()).into_shared();
let mut packer = Packer::new(
be.clone(),
BlobType::Tree,
indexer.clone(),
config,
index.total_size(&BlobType::Tree),
)?;
for mut snap in snapshots {
let snap_id = snap.id;
info!("processing snapshot {snap_id}");
match repair_tree(
&index,
&mut packer,
Some(snap.tree),
&mut replaced,
&mut seen,
&opts,
)
.await?
{
(Changed::None, _) => {
info!("snapshot {snap_id} is ok.");
}
(Changed::This, _) => {
warn!("snapshot {snap_id}: root tree is damaged -> marking for deletion!");
delete.push(snap_id);
}
(Changed::SubTree, id) => {
// change snapshot tree
if snap.original.is_none() {
snap.original = Some(snap.id);
}
snap.set_tags(opts.tag.clone());
snap.tree = id;
if opts.dry_run {
info!("would have modified snapshot {snap_id}.");
} else {
let new_id = be.save_file(&snap).await?;
info!("saved modified snapshot as {new_id}.");
}
delete.push(snap_id);
}
}
}
if !opts.dry_run {
packer.finalize().await?;
indexer.write().await.finalize().await?;
}
if opts.delete {
if opts.dry_run {
info!("would have removed {} snapshots.", delete.len());
} else {
be.delete_list(
FileType::Snapshot,
true,
delete,
progress_counter("remove defect snapshots"),
)
.await?;
}
}
Ok(())
}
#[derive(Clone, Copy)]
enum Changed {
This,
SubTree,
None,
}
#[async_recursion]
async fn repair_tree<BE: DecryptWriteBackend>(
be: &impl IndexedBackend,
packer: &mut Packer<BE>,
id: Option<Id>,
replaced: &mut HashMap<Id, (Changed, Id)>,
seen: &mut HashSet<Id>,
opts: &SnapOpts,
) -> Result<(Changed, Id)> {
let (tree, changed) = match id {
None => (Tree::new(), Changed::This),
Some(id) => {
if seen.contains(&id) {
return Ok((Changed::None, id));
}
if let Some(r) = replaced.get(&id) {
return Ok(*r);
}
let (tree, mut changed) = match Tree::from_backend(be, id).await {
Ok(tree) => (tree, Changed::None),
Err(_) => {
warn!("tree {id} could not be loaded.");
(Tree::new(), Changed::This)
}
};
let mut new_tree = Tree::new();
for mut node in tree {
match node.node_type {
NodeType::File {} => {
let mut file_changed = false;
let mut new_content = Vec::new();
let mut new_size = 0;
for blob in node.content.take().unwrap() {
match be.get_data(&blob) {
Some(ie) => {
new_content.push(blob);
new_size += ie.data_length() as u64;
}
None => {
file_changed = true;
}
}
}
if file_changed {
warn!("file {}: contents are missing", node.name);
node.name += &opts.suffix;
changed = Changed::SubTree;
} else if new_size != node.meta.size {
info!("file {}: corrected file size", node.name);
changed = Changed::SubTree;
}
node.content = Some(new_content);
node.meta.size = new_size;
}
NodeType::Dir {} => {
let (c, tree_id) =
repair_tree(be, packer, node.subtree, replaced, seen, opts).await?;
match c {
Changed::None => {}
Changed::This => {
warn!("dir {}: tree is missing", node.name);
node.subtree = Some(tree_id);
node.name += &opts.suffix;
changed = Changed::SubTree;
}
Changed::SubTree => {
node.subtree = Some(tree_id);
changed = Changed::SubTree;
}
}
}
_ => {} // Other types: no check needed
}
new_tree.add(node);
}
if let Changed::None = changed {
seen.insert(id);
}
(new_tree, changed)
}
};
match (id, changed) {
(None, Changed::None) => panic!("this should not happen!"),
(Some(id), Changed::None) => Ok((Changed::None, id)),
(_, c) => {
// the tree has been changed => save it
let (chunk, new_id) = tree.serialize()?;
if !be.has_tree(&new_id) && !opts.dry_run {
packer.add(&chunk, &new_id).await?;
}
if let Some(id) = id {
replaced.insert(id, (c, new_id));
}
Ok((c, new_id))
}
}
}