Add merge command

This commit is contained in:
Alexander Weiss 2023-01-30 22:12:55 +01:00
parent 83061b154e
commit a01d0b0174
6 changed files with 154 additions and 33 deletions

View File

@ -38,6 +38,7 @@ Improvements:
* `restore` uses existing files; also option `--delete` available
* Snapshots save much more information, available in `snapshots` command
* Allows to save repository options in the repository config file via the command `config`
* New command `merge`
* New command `repo-info`
* `check` command checks and uses cache; option `--trust-cache` is available
* Option `prune --fast-repack` for faster repacking

View File

@ -8,6 +8,7 @@ Bugs fixed:
New features:
- New command dump has been added.
- New command merge has been added.
- Extra or wrong fields in the config file now lead to rustic complaining and aborting.
- backup: Paths are now sanitized from command arguments and config file before matching and applying the configuration.
- check --read-data: progress bar now also shows total bytes to check and ETA.

View File

@ -13,6 +13,7 @@ use serde::{Deserialize, Deserializer, Serialize};
use crate::crypto::hash;
use crate::id::Id;
use crate::index::IndexedBackend;
use crate::repofile::SnapshotSummary;
use super::{Metadata, Node, NodeType};
@ -260,8 +261,9 @@ impl Iterator for TreeStreamerOnce {
pub fn merge_trees(
be: &impl IndexedBackend,
trees: Vec<Id>,
cmp: &impl Fn(Node, Node) -> Ordering,
save: &impl Fn(Tree) -> Result<Id>,
cmp: &impl Fn(&Node, &Node) -> Ordering,
save: &impl Fn(Tree) -> Result<(Id, u64)>,
summary: &mut SnapshotSummary,
) -> Result<Id> {
// We store nodes with the index of the tree in an Binary Heap where we sort only by node name
struct SortedNode(Node, usize);
@ -282,18 +284,9 @@ pub fn merge_trees(
}
}
// TODO: Eliminate duplicate ids
// Handle simple special cases
match trees.len() {
0 => bail!("merge_trees: cannot merge 0 trees!"),
1 => return Ok(trees[0]),
_ => {}
}
let mut tree_iters: Vec<_> = trees
.into_iter()
.map(|id| Tree::from_backend(be, id).map(|tree| tree.into_iter()))
.iter()
.map(|id| Tree::from_backend(be, *id).map(|tree| tree.into_iter()))
.collect::<Result<_>>()?;
// fill Heap with first elements from all trees
@ -307,7 +300,11 @@ pub fn merge_trees(
let mut tree = Tree::new();
let (mut node, mut num) = match elems.pop() {
None => {
return save(tree);
let (id, size) = save(tree)?;
summary.dirs_unmodified += 1;
summary.total_dirs_processed += 1;
summary.total_dirsize_processed += size;
return Ok(id);
}
Some(SortedNode(node, num)) => (node, num),
};
@ -324,14 +321,14 @@ pub fn merge_trees(
// Add node to nodes list
nodes.push(node);
// no node left to proceed, merge nodes and quit
tree.add(merge_nodes(be, nodes, cmp, save)?);
tree.add(merge_nodes(be, nodes, cmp, save, summary)?);
break;
}
Some(SortedNode(new_node, new_num)) if node.name != new_node.name => {
// Add node to nodes list
nodes.push(node);
// next node has other name; merge present nodes
tree.add(merge_nodes(be, nodes, cmp, save)?);
tree.add(merge_nodes(be, nodes, cmp, save, summary)?);
nodes = Vec::new();
// use this node as new node
(node, num) = (new_node, new_num);
@ -344,36 +341,39 @@ pub fn merge_trees(
}
};
}
save(tree)
let (id, size) = save(tree)?;
if trees.contains(&id) {
summary.dirs_unmodified += 1;
} else {
summary.dirs_changed += 1;
}
summary.total_dirs_processed += 1;
summary.total_dirsize_processed += size;
Ok(id)
}
fn merge_nodes(
be: &impl IndexedBackend,
mut nodes: Vec<Node>,
cmp: &impl Fn(Node, Node) -> Ordering,
save: &impl Fn(Tree) -> Result<Id>,
nodes: Vec<Node>,
cmp: &impl Fn(&Node, &Node) -> Ordering,
save: &impl Fn(Tree) -> Result<(Id, u64)>,
summary: &mut SnapshotSummary,
) -> Result<Node> {
// Handle simple special cases
match nodes.len() {
0 => bail!("merge_nodes: cannot merge 0 nodes!"),
1 => return Ok(nodes.swap_remove(0)),
_ => {}
}
let trees: Vec<_> = nodes
.iter()
.filter(|node| node.is_dir())
.map(|node| node.subtree().unwrap())
.collect();
let mut node = nodes
.into_iter()
.max_by(|n1, n2| n1.meta.mtime.cmp(&n2.meta.mtime))
.unwrap();
let mut node = nodes.into_iter().max_by(|n1, n2| cmp(n1, n2)).unwrap();
// if this is a dir, merge with all other dirs
if node.is_dir() {
node.subtree = Some(merge_trees(be, trees, cmp, save)?);
node.subtree = Some(merge_trees(be, trees, cmp, save, summary)?);
} else {
summary.files_unmodified += 1;
summary.total_files_processed += 1;
summary.total_bytes_processed += node.meta.size;
}
Ok(node)
}

110
src/commands/merge_cmd.rs Normal file
View File

@ -0,0 +1,110 @@
use anyhow::Result;
use chrono::Local;
use clap::{AppSettings, Parser};
use log::*;
use crate::backend::{DecryptWriteBackend, FileType};
use crate::blob::{merge_trees, BlobType, Node, Packer, Tree};
use crate::index::{IndexBackend, Indexer, ReadIndex};
use crate::repofile::{PathList, SnapshotFile, SnapshotFilter, SnapshotOptions};
use crate::repository::OpenRepository;
use super::helpers::{progress_counter, progress_spinner};
use super::rustic_config::RusticConfig;
#[derive(Default, Parser)]
#[clap(global_setting(AppSettings::DeriveDisplayOrder))]
pub(super) struct Opts {
/// Output generated snapshot in json format
#[clap(long)]
json: bool,
/// Remove input snapshots after merging
#[clap(long)]
delete: bool,
#[clap(flatten)]
snap_opts: SnapshotOptions,
#[clap(flatten, help_heading = "SNAPSHOT FILTER OPTIONS")]
filter: SnapshotFilter,
/// Snapshots to merge. If none is given, use filter to filter from all snapshots.
#[clap(value_name = "ID")]
ids: Vec<String>,
}
pub(super) fn execute(
repo: OpenRepository,
mut opts: Opts,
config_file: RusticConfig,
command: String,
) -> Result<()> {
let time = Local::now();
let be = &repo.dbe;
config_file.merge_into("snapshot-filter", &mut opts.filter)?;
let snapshots = match opts.ids.is_empty() {
true => SnapshotFile::all_from_backend(be, &opts.filter)?,
false => SnapshotFile::from_ids(be, &opts.ids)?,
};
let index = IndexBackend::only_full_trees(&be.clone(), progress_counter(""))?;
let indexer = Indexer::new(be.clone()).into_shared();
let packer = Packer::new(
be.clone(),
BlobType::Tree,
indexer.clone(),
&repo.config,
index.total_size(BlobType::Tree),
)?;
let mut snap = SnapshotFile::new_from_options(opts.snap_opts, time, command)?;
let paths = PathList::from_strings(snapshots.iter().flat_map(|snap| snap.paths.iter()), false)?;
snap.paths.set_paths(&paths.paths())?;
let mut summary = snap.summary.take().unwrap();
summary.backup_start = Local::now();
let p = progress_spinner("merging snapshots...");
let trees = snapshots.iter().map(|sn| sn.tree).collect();
let cmp = |n1: &Node, n2: &Node| n1.meta.mtime.cmp(&n2.meta.mtime);
let save = |tree: Tree| {
let (chunk, new_id) = tree.serialize()?;
let size = u64::try_from(chunk.len())?;
if !index.has_tree(&new_id) {
packer.add(&chunk, &new_id)?;
}
Ok((new_id, size))
};
let tree_merged = merge_trees(&index, trees, &cmp, &save, &mut summary)?;
snap.tree = tree_merged;
let stats = packer.finalize()?;
stats.apply(&mut summary, BlobType::Tree);
indexer.write().unwrap().finalize()?;
p.finish();
summary.finalize(time)?;
snap.summary = Some(summary);
let new_id = be.save_file(&snap)?;
snap.id = new_id;
if opts.json {
let mut stdout = std::io::stdout();
serde_json::to_writer_pretty(&mut stdout, &snap)?;
}
info!("saved new snapshot as {new_id}.");
if opts.delete {
let p = progress_counter("deleting old snapshots...");
let snap_ids = snapshots.iter().map(|sn| &sn.id);
be.delete_list(FileType::Snapshot, true, snap_ids, p)?;
}
Ok(())
}

View File

@ -27,6 +27,7 @@ mod init;
mod key;
mod list;
mod ls;
mod merge_cmd;
mod prune;
mod repair;
mod repoinfo;
@ -119,6 +120,9 @@ enum Command {
/// List file contents of a snapshot
Ls(ls::Opts),
/// Merge snapshots
Merge(merge_cmd::Opts),
/// Show a detailed overview of the snapshots within the repository
Snapshots(snapshots::Opts),
@ -221,6 +225,7 @@ pub fn execute() -> Result<()> {
Command::Key(opts) => key::execute(repo, opts)?,
Command::List(opts) => list::execute(repo, opts)?,
Command::Ls(opts) => ls::execute(repo, opts, config_file)?,
Command::Merge(opts) => merge_cmd::execute(repo, opts, config_file, command)?,
Command::SelfUpdate(_) => {} // already handled above
Command::Snapshots(opts) => snapshots::execute(repo, opts, config_file)?,
Command::Prune(opts) => prune::execute(repo, opts, vec![])?,

View File

@ -28,7 +28,7 @@ pub struct SnapshotOptions {
#[clap(long, value_name = "LABEL")]
label: Option<String>,
/// Tags to add to backup (can be specified multiple times)
/// Tags to add to snapshot (can be specified multiple times)
#[clap(long, value_name = "TAG[,TAG,..]")]
#[serde_as(as = "Vec<DisplayFromStr>")]
#[merge(strategy = merge::vec::overwrite_empty)]
@ -574,6 +574,10 @@ impl StringList {
pub fn formatln(&self) -> String {
self.0.join("\n")
}
pub fn iter(&self) -> std::slice::Iter<String> {
self.0.iter()
}
}
#[derive(Default, Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]