Parallelize backend operations

This commit is contained in:
Alexander Weiss 2022-10-16 13:27:29 +02:00
parent 606f08648e
commit 683f2bc26c
19 changed files with 184 additions and 188 deletions

60
Cargo.lock generated
View File

@ -356,6 +356,40 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2dd04ddaf88237dc3b8d8f9a3c1004b506b54b3313403944054d23c0870c521"
dependencies = [
"cfg-if",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-deque"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "715e8152b692bba2d374b53d4875445368fdf21a94751410af607a5ac677d1fc"
dependencies = [
"cfg-if",
"crossbeam-epoch",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-epoch"
version = "0.9.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f916dfc5d356b0ed9dae65f1db9fc9770aa2851d2662b988ccf4fe3516e86348"
dependencies = [
"autocfg",
"cfg-if",
"crossbeam-utils",
"memoffset",
"scopeguard",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.11"
@ -1407,6 +1441,30 @@ dependencies = [
"getrandom",
]
[[package]]
name = "rayon"
version = "1.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd99e5772ead8baa5215278c9b15bf92087709e9c1b2d1f97cdb5a183c933a7d"
dependencies = [
"autocfg",
"crossbeam-deque",
"either",
"rayon-core",
]
[[package]]
name = "rayon-core"
version = "1.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "258bcdb5ac6dad48491bb2992db6b7cf74878b0384908af124823d118c99683f"
dependencies = [
"crossbeam-channel",
"crossbeam-deque",
"crossbeam-utils",
"num_cpus",
]
[[package]]
name = "redox_syscall"
version = "0.2.16"
@ -1570,6 +1628,7 @@ dependencies = [
"chrono",
"clap",
"clap_complete",
"crossbeam-channel",
"derivative",
"derive-getters",
"derive_more",
@ -1595,6 +1654,7 @@ dependencies = [
"quickcheck",
"quickcheck_macros",
"rand",
"rayon",
"reqwest",
"rpassword",
"rstest",

View File

@ -32,6 +32,9 @@ derivative = "2"
derive-getters = "0.2"
lazy_static = "1"
log = "0.4"
# parallelize
crossbeam-channel = "0.5"
rayon = "1"
# async
tokio = { version = "1", features = ["full"] }
futures = "0.3"

View File

@ -1,11 +1,10 @@
use std::num::NonZeroU32;
use anyhow::{bail, Result};
use async_trait::async_trait;
use bytes::Bytes;
use futures::{stream, stream::FuturesUnordered, StreamExt};
use crossbeam_channel::{unbounded, Receiver};
use indicatif::ProgressBar;
use tokio::{spawn, task::JoinHandle};
use rayon::prelude::*;
use zstd::stream::{copy_encode, decode_all};
use super::{FileType, Id, ReadBackend, RepoFile, WriteBackend};
@ -14,7 +13,6 @@ use crate::crypto::{hash, CryptoKey};
pub trait DecryptFullBackend: DecryptWriteBackend + DecryptReadBackend {}
impl<T: DecryptWriteBackend + DecryptReadBackend> DecryptFullBackend for T {}
#[async_trait]
pub trait DecryptReadBackend: ReadBackend {
fn decrypt(&self, data: &[u8]) -> Result<Vec<u8>>;
@ -52,38 +50,25 @@ pub trait DecryptReadBackend: ReadBackend {
Ok(serde_json::from_slice(&data)?)
}
async fn stream_all<F: RepoFile>(
&self,
p: ProgressBar,
) -> Result<FuturesUnordered<JoinHandle<(Id, F)>>> {
fn stream_all<F: RepoFile>(&self, p: ProgressBar) -> Result<Receiver<(Id, F)>> {
let list = self.list(F::TYPE)?;
self.stream_list(list, p).await
self.stream_list(list, p)
}
async fn stream_list<F: RepoFile>(
&self,
list: Vec<Id>,
p: ProgressBar,
) -> Result<FuturesUnordered<JoinHandle<(Id, F)>>> {
fn stream_list<F: RepoFile>(&self, list: Vec<Id>, p: ProgressBar) -> Result<Receiver<(Id, F)>> {
p.set_length(list.len() as u64);
let (tx, rx) = unbounded();
let stream: FuturesUnordered<_> = list
.into_iter()
.map(|id| {
let be = self.clone();
let p = p.clone();
spawn(async move {
let file = be.get_file::<F>(&id).unwrap();
p.inc(1);
(id, file)
})
})
.collect();
Ok(stream)
list.into_par_iter()
.for_each_with((self, p, tx), |(be, p, tx), id| {
let file = be.get_file::<F>(&id).unwrap();
p.inc(1);
tx.send((id, file)).unwrap();
});
Ok(rx)
}
}
#[async_trait]
pub trait DecryptWriteBackend: WriteBackend {
type Key: CryptoKey;
fn key(&self) -> &Self::Key;
@ -91,26 +76,20 @@ pub trait DecryptWriteBackend: WriteBackend {
fn save_file<F: RepoFile>(&self, file: &F) -> Result<Id> {
let data = serde_json::to_vec(file)?;
Ok(self.hash_write_full(F::TYPE, &data)?)
self.hash_write_full(F::TYPE, &data)
}
async fn save_list<F: RepoFile>(&self, list: Vec<F>, p: ProgressBar) -> Result<()> {
fn save_list<F: RepoFile>(&self, list: Vec<F>, p: ProgressBar) -> Result<()> {
p.set_length(list.len() as u64);
stream::iter(list.into_iter().map(|file| {
let be = self.clone();
let p = p.clone();
(file, be, p)
}))
.for_each_concurrent(5, |(file, be, p)| async move {
be.save_file(&file).unwrap();
list.par_iter().for_each(|file| {
self.save_file(file).unwrap();
p.inc(1);
})
.await;
});
p.finish();
Ok(())
}
async fn delete_list(
fn delete_list(
&self,
tpe: FileType,
cacheable: bool,
@ -118,16 +97,10 @@ pub trait DecryptWriteBackend: WriteBackend {
p: ProgressBar,
) -> Result<()> {
p.set_length(list.len() as u64);
stream::iter(list.into_iter().map(|id| {
let be = self.clone();
let p = p.clone();
(id, be, p)
}))
.for_each_concurrent(20, |(id, be, p)| async move {
be.remove(tpe, &id, cacheable).unwrap();
list.par_iter().for_each(|id| {
self.remove(tpe, id, cacheable).unwrap();
p.inc(1);
})
.await;
});
p.finish();
Ok(())
@ -153,7 +126,6 @@ impl<R: ReadBackend, C: CryptoKey> DecryptBackend<R, C> {
}
}
#[async_trait]
impl<R: WriteBackend, C: CryptoKey> DecryptWriteBackend for DecryptBackend<R, C> {
type Key = C;
fn key(&self) -> &Self::Key {
@ -179,7 +151,6 @@ impl<R: WriteBackend, C: CryptoKey> DecryptWriteBackend for DecryptBackend<R, C>
}
}
#[async_trait]
impl<R: ReadBackend, C: CryptoKey> DecryptReadBackend for DecryptBackend<R, C> {
fn decrypt(&self, data: &[u8]) -> Result<Vec<u8>> {
Ok(self.key.decrypt_data(data)?)

View File

@ -116,7 +116,7 @@ pub(super) async fn execute(
}
};
let index = IndexBackend::only_full_trees(&be.clone(), progress_counter("")).await?;
let index = IndexBackend::only_full_trees(&be.clone(), progress_counter(""))?;
for source in sources {
let mut opts = opts.clone();
@ -161,7 +161,6 @@ pub(super) async fn execute(
|snap| snap.hostname == hostname && snap.paths.contains(&backup_path_str),
progress_counter(""),
)
.await
.ok(),
(false, false, Some(parent)) => SnapshotFile::from_id(&be, &parent).ok(),
};

View File

@ -46,16 +46,16 @@ struct TreeOpts {
snap: String,
}
pub(super) async fn execute(be: &impl DecryptReadBackend, opts: Opts) -> Result<()> {
pub(super) fn execute(be: &impl DecryptReadBackend, opts: Opts) -> Result<()> {
match opts.command {
Command::Config => cat_file(be, FileType::Config, IdOpt::default()),
Command::Index(opt) => cat_file(be, FileType::Index, opt),
Command::Snapshot(opt) => cat_file(be, FileType::Snapshot, opt),
// special treatment for catingg blobs: read the index and use it to locate the blob
Command::TreeBlob(opt) => cat_blob(be, BlobType::Tree, opt).await,
Command::DataBlob(opt) => cat_blob(be, BlobType::Data, opt).await,
Command::TreeBlob(opt) => cat_blob(be, BlobType::Tree, opt),
Command::DataBlob(opt) => cat_blob(be, BlobType::Data, opt),
// special treatment for cating a tree within a snapshot
Command::Tree(opts) => cat_tree(be, opts).await,
Command::Tree(opts) => cat_tree(be, opts),
}
}
@ -67,20 +67,18 @@ fn cat_file(be: &impl DecryptReadBackend, tpe: FileType, opt: IdOpt) -> Result<(
Ok(())
}
async fn cat_blob(be: &impl DecryptReadBackend, tpe: BlobType, opt: IdOpt) -> Result<()> {
fn cat_blob(be: &impl DecryptReadBackend, tpe: BlobType, opt: IdOpt) -> Result<()> {
let id = Id::from_hex(&opt.id)?;
let data = IndexBackend::new(be, ProgressBar::hidden())
.await?
.blob_from_backend(&tpe, &id)?;
let data = IndexBackend::new(be, ProgressBar::hidden())?.blob_from_backend(&tpe, &id)?;
print!("{}", String::from_utf8(data.to_vec())?);
Ok(())
}
async fn cat_tree(be: &impl DecryptReadBackend, opts: TreeOpts) -> Result<()> {
fn cat_tree(be: &impl DecryptReadBackend, opts: TreeOpts) -> Result<()> {
let (id, path) = opts.snap.split_once(':').unwrap_or((&opts.snap, ""));
let snap = SnapshotFile::from_str(be, id, |_| true, progress_counter("")).await?;
let index = IndexBackend::new(be, progress_counter("")).await?;
let snap = SnapshotFile::from_str(be, id, |_| true, progress_counter(""))?;
let index = IndexBackend::new(be, progress_counter(""))?;
let id = Tree::subtree_id(&index, snap.tree, Path::new(path))?;
let data = index.blob_from_backend(&BlobType::Tree, &id)?;
println!("{}", String::from_utf8(data.to_vec())?);

View File

@ -56,11 +56,11 @@ pub(super) async fn execute(
if let Some(hot_be) = hot_be {
for file_type in [FileType::Snapshot, FileType::Index] {
check_hot_files(raw_be, hot_be, file_type).await?;
check_hot_files(raw_be, hot_be, file_type)?;
}
}
let index_collector = check_packs(be, hot_be, opts.read_data).await?;
let index_collector = check_packs(be, hot_be, opts.read_data)?;
if !opts.trust_cache {
if let Some(cache) = &cache {
@ -102,7 +102,7 @@ pub(super) async fn execute(
Ok(())
}
async fn check_hot_files(
fn check_hot_files(
be: &impl ReadBackend,
be_hot: &impl ReadBackend,
file_type: FileType,
@ -180,7 +180,7 @@ async fn check_cache_files(
}
// check if packs correspond to index
async fn check_packs(
fn check_packs(
be: &impl DecryptReadBackend,
hot_be: &Option<impl ReadBackend>,
read_data: bool,
@ -224,9 +224,7 @@ async fn check_packs(
};
let p = progress_counter("reading index...");
let mut stream = be.stream_all::<IndexFile>(p.clone()).await?;
while let Some(index) = stream.try_next().await? {
let index = index.1;
for (_, index) in be.stream_all::<IndexFile>(p.clone())? {
index_collector.extend(index.packs.clone());
for p in index.packs {
process_pack(p);
@ -235,22 +233,23 @@ async fn check_packs(
process_pack(p);
}
}
p.finish();
if let Some(hot_be) = hot_be {
let p = progress_spinner("listing packs in hot repo...");
check_packs_list(hot_be, tree_packs).await?;
check_packs_list(hot_be, tree_packs)?;
p.finish();
}
let p = progress_spinner("listing packs...");
check_packs_list(be, packs).await?;
check_packs_list(be, packs)?;
p.finish();
Ok(index_collector)
}
async fn check_packs_list(be: &impl ReadBackend, mut packs: HashMap<Id, u32>) -> Result<()> {
fn check_packs_list(be: &impl ReadBackend, mut packs: HashMap<Id, u32>) -> Result<()> {
for (id, size) in be.list_with_size(FileType::Pack)? {
match packs.remove(&id) {
None => warn!("pack {id} not referenced in index. Can be a parallel backup job. To repair: 'rustic repair index'."),
@ -272,11 +271,10 @@ async fn check_snapshots(index: &(impl IndexedBackend + Unpin)) -> Result<()> {
let p = progress_counter("reading snapshots...");
let snap_trees: Vec<_> = index
.be()
.stream_all::<SnapshotFile>(p.clone())
.await?
.map_ok(|(_, snap)| snap.tree)
.try_collect()
.await?;
.stream_all::<SnapshotFile>(p.clone())?
.iter()
.map(|(_, snap)| snap.tree)
.collect();
p.finish();
let p = progress_counter("checking trees...");

View File

@ -28,12 +28,12 @@ pub(super) async fn execute(be: &(impl DecryptReadBackend + Unpin), opts: Opts)
let p = progress_spinner("getting snapshots...");
p.finish();
let snaps = SnapshotFile::from_ids(be, &[id1.to_string(), id2.to_string()]).await?;
let snaps = SnapshotFile::from_ids(be, &[id1.to_string(), id2.to_string()])?;
let snap1 = &snaps[0];
let snap2 = &snaps[1];
let index = IndexBackend::new(be, progress_counter("")).await?;
let index = IndexBackend::new(be, progress_counter(""))?;
let id1 = Tree::subtree_id(&index, snap1.tree, Path::new(path1))?;
let id2 = Tree::subtree_id(&index, snap2.tree, Path::new(path2))?;

View File

@ -74,10 +74,10 @@ pub(super) async fn execute(
.unwrap_or_else(|| SnapshotGroupCriterion::from_str("host,paths").unwrap());
let groups = match opts.ids.is_empty() {
true => SnapshotFile::group_from_backend(be, &opts.config.filter, &group_by).await?,
true => SnapshotFile::group_from_backend(be, &opts.config.filter, &group_by)?,
false => vec![(
SnapshotGroup::default(),
SnapshotFile::from_ids(be, &opts.ids).await?,
SnapshotFile::from_ids(be, &opts.ids)?,
)],
};
let mut forget_snaps = Vec::new();
@ -146,8 +146,7 @@ pub(super) async fn execute(
),
(false, false) => {
let p = progress_counter("removing snapshots...");
be.delete_list(FileType::Snapshot, true, forget_snaps.clone(), p)
.await?;
be.delete_list(FileType::Snapshot, true, forget_snaps.clone(), p)?;
}
}

View File

@ -1,6 +1,5 @@
use anyhow::{bail, Result};
use clap::Parser;
use futures::StreamExt;
use indicatif::ProgressBar;
use crate::backend::{DecryptReadBackend, FileType};
@ -13,13 +12,12 @@ pub(super) struct Opts {
tpe: String,
}
pub(super) async fn execute(be: &impl DecryptReadBackend, opts: Opts) -> Result<()> {
pub(super) fn execute(be: &impl DecryptReadBackend, opts: Opts) -> Result<()> {
let tpe = match opts.tpe.as_str() {
// special treatment for listing blobs: read the index and display it
"blobs" => {
let mut stream = be.stream_all::<IndexFile>(ProgressBar::hidden()).await?;
while let Some(index) = stream.next().await {
for pack in index?.1.packs {
for (_, index) in be.stream_all::<IndexFile>(ProgressBar::hidden())? {
for pack in index.packs {
for blob in pack.blobs {
println!("{:?} {}", blob.tpe, blob.id.to_hex());
}

View File

@ -18,8 +18,8 @@ pub(super) struct Opts {
pub(super) async fn execute(be: &(impl DecryptReadBackend + Unpin), opts: Opts) -> Result<()> {
let (id, path) = opts.snap.split_once(':').unwrap_or((&opts.snap, ""));
let snap = SnapshotFile::from_str(be, id, |_| true, progress_counter("")).await?;
let index = IndexBackend::new(be, progress_counter("")).await?;
let snap = SnapshotFile::from_str(be, id, |_| true, progress_counter(""))?;
let index = IndexBackend::new(be, progress_counter(""))?;
let tree = Tree::subtree_id(&index, snap.tree, Path::new(path))?;
let mut tree_streamer = NodeStreamer::new(index, tree)?;

View File

@ -307,22 +307,22 @@ pub async fn execute() -> Result<()> {
match cmd {
Command::Backup(opts) => backup::execute(&dbe, opts, config, config_file, command).await?,
Command::Config(opts) => config::execute(&dbe, &be_hot, opts, config)?,
Command::Cat(opts) => cat::execute(&dbe, opts).await?,
Command::Cat(opts) => cat::execute(&dbe, opts)?,
Command::Check(opts) => check::execute(&dbe, &cache, &be_hot, &be, opts).await?,
Command::Completions(_) => {} // already handled above
Command::Diff(opts) => diff::execute(&dbe, opts).await?,
Command::Forget(opts) => forget::execute(&dbe, cache, opts, config, config_file).await?,
Command::Init(_) => {} // already handled above
Command::Key(opts) => key::execute(&dbe, key, opts)?,
Command::List(opts) => list::execute(&dbe, opts).await?,
Command::List(opts) => list::execute(&dbe, opts)?,
Command::Ls(opts) => ls::execute(&dbe, opts).await?,
Command::SelfUpdate(_) => {} // already handled above
Command::Snapshots(opts) => snapshots::execute(&dbe, opts, config_file).await?,
Command::Snapshots(opts) => snapshots::execute(&dbe, opts, config_file)?,
Command::Prune(opts) => prune::execute(&dbe, cache, opts, config, vec![]).await?,
Command::Restore(opts) => restore::execute(&dbe, opts).await?,
Command::Repair(opts) => repair::execute(&dbe, opts, config_file, &config).await?,
Command::Repoinfo(opts) => repoinfo::execute(&dbe, &be_hot, opts).await?,
Command::Tag(opts) => tag::execute(&dbe, opts, config_file).await?,
Command::Repoinfo(opts) => repoinfo::execute(&dbe, &be_hot, opts)?,
Command::Tag(opts) => tag::execute(&dbe, opts, config_file)?,
};
Ok(())

View File

@ -7,7 +7,7 @@ use bytesize::ByteSize;
use chrono::{DateTime, Duration, Local};
use clap::{AppSettings, Parser};
use derive_more::Add;
use futures::{future, TryStreamExt};
use futures::TryStreamExt;
use log::*;
use super::{bytes, no_progress, progress_bytes, progress_counter, wait, warm_up, warm_up_command};
@ -98,10 +98,9 @@ pub(super) async fn execute(
let mut index_files = Vec::new();
let p = progress_counter("reading index...");
let mut stream = be.stream_all::<IndexFile>(p.clone()).await?;
let mut index_collector = IndexCollector::new(IndexType::OnlyTrees);
while let Some((id, index)) = stream.try_next().await? {
for (id, index) in be.stream_all::<IndexFile>(p.clone())? {
index_collector.extend(index.packs.clone());
// we add the trees from packs_to_delete to the index such that searching for
// used blobs doesn't abort if they are already marked for deletion
@ -879,8 +878,7 @@ impl Pruner {
let p = progress_counter("removing unindexed packs...");
let existing_packs: Vec<_> =
self.existing_packs.into_iter().map(|(id, _)| id).collect();
be.delete_list(FileType::Pack, true, existing_packs, p)
.await?;
be.delete_list(FileType::Pack, true, existing_packs, p)?;
} else {
info!("marking not needed unindexed pack files for deletion...");
for (id, size) in self.existing_packs {
@ -991,20 +989,17 @@ impl Pruner {
if !data_packs_remove.is_empty() {
let p = progress_counter("removing old data packs...");
be.delete_list(FileType::Pack, false, data_packs_remove, p)
.await?;
be.delete_list(FileType::Pack, false, data_packs_remove, p)?;
}
if !tree_packs_remove.is_empty() {
let p = progress_counter("removing old tree packs...");
be.delete_list(FileType::Pack, true, tree_packs_remove, p)
.await?;
be.delete_list(FileType::Pack, true, tree_packs_remove, p)?;
}
if !indexes_remove.is_empty() {
let p = progress_counter("removing old index files...");
be.delete_list(FileType::Index, true, indexes_remove, p)
.await?;
be.delete_list(FileType::Index, true, indexes_remove, p)?;
}
Ok(())
@ -1090,14 +1085,13 @@ async fn find_used_blobs(
let p = progress_counter("reading snapshots...");
let snap_trees: Vec<_> = index
.be()
.stream_all::<SnapshotFile>(p.clone())
.await?
.stream_all::<SnapshotFile>(p.clone())?
.into_iter()
// TODO: it would even better to give ignore_snaps to the streaming function instead
// if reading and then filtering the snapshot
.try_filter(|(id, _)| future::ready(!ignore_snaps.contains(id)))
.map_ok(|(_, snap)| snap.tree)
.try_collect()
.await?;
.filter(|(id, _)| !ignore_snaps.contains(id))
.map(|(_, snap)| snap.tree)
.collect();
p.finish();
let mut ids: HashMap<_, _> = snap_trees.iter().map(|id| (*id, 0)).collect();

View File

@ -3,7 +3,6 @@ use std::collections::{HashMap, HashSet};
use anyhow::Result;
use async_recursion::async_recursion;
use clap::{AppSettings, Parser, Subcommand};
use futures::TryStreamExt;
use log::*;
use crate::backend::{DecryptFullBackend, DecryptWriteBackend, FileType};
@ -142,12 +141,9 @@ async fn repair_index(be: &impl DecryptFullBackend, opts: IndexOpts) -> Result<(
};
let p = progress_counter("reading index...");
let mut stream = be.stream_all::<IndexFile>(p.clone()).await?;
while let Some(index) = stream.try_next().await? {
for (index_id, index) in be.stream_all::<IndexFile>(p.clone())? {
let mut new_index = IndexFile::default();
let mut changed = false;
let index_id = index.0;
let index = index.1;
for p in index.packs {
process_pack(p, false, &mut new_index, &mut changed);
}
@ -214,15 +210,15 @@ async fn repair_snaps(
config_file.merge_into("snapshot-filter", &mut opts.filter)?;
let snapshots = match opts.ids.is_empty() {
true => SnapshotFile::all_from_backend(be, &opts.filter).await?,
false => SnapshotFile::from_ids(be, &opts.ids).await?,
true => SnapshotFile::all_from_backend(be, &opts.filter)?,
false => SnapshotFile::from_ids(be, &opts.ids)?,
};
let mut replaced = HashMap::new();
let mut seen = HashSet::new();
let mut delete = Vec::new();
let index = IndexBackend::new(&be.clone(), progress_counter("")).await?;
let index = IndexBackend::new(&be.clone(), progress_counter(""))?;
let indexer = Indexer::new(be.clone()).into_shared();
let mut packer = Packer::new(
be.clone(),
@ -284,8 +280,7 @@ async fn repair_snaps(
true,
delete,
progress_counter("remove defect snapshots"),
)
.await?;
)?;
}
}

View File

@ -1,7 +1,6 @@
use anyhow::Result;
use clap::Parser;
use derive_more::Add;
use futures::TryStreamExt;
use log::*;
use prettytable::{format, row, Table};
@ -14,19 +13,16 @@ use crate::repo::{IndexFile, IndexPack};
#[derive(Parser)]
pub(super) struct Opts;
pub(super) async fn execute(
pub(super) fn execute(
be: &impl DecryptReadBackend,
hot_be: &Option<impl ReadBackend>,
_opts: Opts,
) -> Result<()> {
fileinfo("repository files", be).await?;
fileinfo("repository files", be)?;
if let Some(hot_be) = hot_be {
fileinfo("hot repository files", hot_be).await?;
fileinfo("hot repository files", hot_be)?;
}
let p = progress_counter("scanning index...");
let mut stream = be.stream_all::<IndexFile>(p.clone()).await?;
#[derive(Default, Clone, Copy, Add)]
struct Info {
count: u64,
@ -59,7 +55,8 @@ pub(super) async fn execute(
info[BlobType::Data].min_pack_size = u64::MAX;
let mut info_delete = BlobTypeMap::<Info>::default();
while let Some((_, index)) = stream.try_next().await? {
let p = progress_counter("scanning index...");
for (_, index) in be.stream_all::<IndexFile>(p.clone())? {
for pack in &index.packs {
info[pack.blob_type()].add_pack(pack);
@ -109,7 +106,7 @@ pub(super) async fn execute(
Ok(())
}
async fn fileinfo(text: &str, be: &impl ReadBackend) -> Result<()> {
fn fileinfo(text: &str, be: &impl ReadBackend) -> Result<()> {
info!("scanning files...");
let mut table = Table::new();

View File

@ -67,9 +67,9 @@ pub(super) async fn execute(be: &(impl DecryptReadBackend + Unpin), opts: Opts)
}
let (id, path) = opts.snap.split_once(':').unwrap_or((&opts.snap, ""));
let snap = SnapshotFile::from_str(be, id, |_| true, progress_counter("")).await?;
let snap = SnapshotFile::from_str(be, id, |_| true, progress_counter(""))?;
let index = IndexBackend::new(be, progress_counter("")).await?;
let index = IndexBackend::new(be, progress_counter(""))?;
let tree = Tree::subtree_id(&index, snap.tree, Path::new(path))?;
let dest = LocalBackend::new(&opts.dest);

View File

@ -43,7 +43,7 @@ pub(super) struct Opts {
ids: Vec<String>,
}
pub(super) async fn execute(
pub(super) fn execute(
be: &impl DecryptReadBackend,
mut opts: Opts,
config_file: RusticConfig,
@ -51,10 +51,9 @@ pub(super) async fn execute(
config_file.merge_into("snapshot-filter", &mut opts.filter)?;
let groups = match &opts.ids[..] {
[] => SnapshotFile::group_from_backend(be, &opts.filter, &opts.group_by).await?,
[] => SnapshotFile::group_from_backend(be, &opts.filter, &opts.group_by)?,
[id] if id == "latest" => {
SnapshotFile::group_from_backend(be, &opts.filter, &opts.group_by)
.await?
SnapshotFile::group_from_backend(be, &opts.filter, &opts.group_by)?
.into_iter()
.map(|(group, mut snaps)| {
snaps.sort_unstable();
@ -67,7 +66,7 @@ pub(super) async fn execute(
}
_ => vec![(
SnapshotGroup::default(),
SnapshotFile::from_ids(be, &opts.ids).await?,
SnapshotFile::from_ids(be, &opts.ids)?,
)],
};

View File

@ -68,7 +68,7 @@ pub(super) struct Opts {
ids: Vec<String>,
}
pub(super) async fn execute(
pub(super) fn execute(
be: &impl DecryptFullBackend,
mut opts: Opts,
config_file: RusticConfig,
@ -76,8 +76,8 @@ pub(super) async fn execute(
config_file.merge_into("snapshot-filter", &mut opts.filter)?;
let snapshots = match opts.ids.is_empty() {
true => SnapshotFile::all_from_backend(be, &opts.filter).await?,
false => SnapshotFile::from_ids(be, &opts.ids).await?,
true => SnapshotFile::all_from_backend(be, &opts.filter)?,
false => SnapshotFile::from_ids(be, &opts.ids)?,
};
let delete = match (
@ -109,11 +109,10 @@ pub(super) async fn execute(
),
(false, false) => {
let p = progress_counter("saving new snapshots...");
be.save_list(snapshots, p).await?;
be.save_list(snapshots, p)?;
let p = progress_counter("deleting old snapshots...");
be.delete_list(FileType::Snapshot, true, old_snap_ids, p)
.await?;
be.delete_list(FileType::Snapshot, true, old_snap_ids, p)?;
}
}
Ok(())

View File

@ -2,11 +2,9 @@ use std::num::NonZeroU32;
use std::sync::Arc;
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use bytes::Bytes;
use derive_getters::Getters;
use derive_more::Constructor;
use futures::StreamExt;
use indicatif::ProgressBar;
use crate::backend::{DecryptReadBackend, FileType};
@ -83,7 +81,6 @@ pub trait ReadIndex {
}
}
#[async_trait]
pub trait IndexedBackend: ReadIndex + Clone + Sync + Send + 'static {
type Backend: DecryptReadBackend;
@ -124,31 +121,23 @@ impl<BE: DecryptReadBackend> IndexBackend<BE> {
}
}
async fn new_from_collector(
be: &BE,
p: ProgressBar,
mut collector: IndexCollector,
) -> Result<Self> {
fn new_from_collector(be: &BE, p: ProgressBar, mut collector: IndexCollector) -> Result<Self> {
p.set_prefix("reading index...");
let mut stream = be
.stream_all::<IndexFile>(p.clone())
.await?
.map(|i| i.unwrap().1);
while let Some(index) = stream.next().await {
collector.extend(index.packs);
for (_, i) in be.stream_all::<IndexFile>(p.clone())? {
collector.extend(i.packs);
}
p.finish();
Ok(Self::new_from_index(be, collector.into_index()))
}
pub async fn new(be: &BE, p: ProgressBar) -> Result<Self> {
Self::new_from_collector(be, p, IndexCollector::new(IndexType::Full)).await
pub fn new(be: &BE, p: ProgressBar) -> Result<Self> {
Self::new_from_collector(be, p, IndexCollector::new(IndexType::Full))
}
pub async fn only_full_trees(be: &BE, p: ProgressBar) -> Result<Self> {
Self::new_from_collector(be, p, IndexCollector::new(IndexType::FullTrees)).await
pub fn only_full_trees(be: &BE, p: ProgressBar) -> Result<Self> {
Self::new_from_collector(be, p, IndexCollector::new(IndexType::FullTrees))
}
pub fn into_index(self) -> Index {

View File

@ -6,7 +6,6 @@ use anyhow::{anyhow, bail, Result};
use chrono::{DateTime, Local};
use clap::Parser;
use derivative::Derivative;
use futures::{future, TryStreamExt};
use indicatif::ProgressBar;
use log::*;
use merge::Merge;
@ -112,33 +111,33 @@ impl SnapshotFile {
Ok(Self::set_id((*id, be.get_file(id)?)))
}
pub async fn from_str<B: DecryptReadBackend>(
pub fn from_str<B: DecryptReadBackend>(
be: &B,
string: &str,
predicate: impl FnMut(&Self) -> bool,
predicate: impl FnMut(&Self) -> bool + Send + Sync,
p: ProgressBar,
) -> Result<Self> {
match string {
"latest" => Self::latest(be, predicate, p).await,
"latest" => Self::latest(be, predicate, p),
_ => Self::from_id(be, string),
}
}
/// Get the latest SnapshotFile from the backend
pub async fn latest<B: DecryptReadBackend>(
pub fn latest<B: DecryptReadBackend>(
be: &B,
predicate: impl FnMut(&Self) -> bool,
predicate: impl FnMut(&Self) -> bool + Send + Sync,
p: ProgressBar,
) -> Result<Self> {
p.set_prefix("getting latest snapshot...");
let mut latest: Option<Self> = None;
let mut pred = predicate;
let mut snaps = be.stream_all::<SnapshotFile>(p.clone()).await?;
while let Some((id, mut snap)) = snaps.try_next().await? {
for (id, mut snap) in be.stream_all::<SnapshotFile>(p.clone())? {
if !pred(&snap) {
continue;
}
snap.id = id;
match &latest {
Some(l) if l.time > snap.time => {}
@ -159,14 +158,13 @@ impl SnapshotFile {
}
/// Get a Vector of SnapshotFile from the backend by list of (parts of the) ids
pub async fn from_ids<B: DecryptReadBackend>(be: &B, ids: &[String]) -> Result<Vec<Self>> {
pub fn from_ids<B: DecryptReadBackend>(be: &B, ids: &[String]) -> Result<Vec<Self>> {
let ids = be.find_ids(FileType::Snapshot, ids)?;
Ok(be
.stream_list::<Self>(ids, ProgressBar::hidden())
.await?
.map_ok(Self::set_id)
.try_collect()
.await?)
.stream_list::<Self>(ids, ProgressBar::hidden())?
.into_iter()
.map(Self::set_id)
.collect())
}
fn cmp_group(&self, crit: &SnapshotGroupCriterion, other: &Self) -> Ordering {
@ -199,12 +197,12 @@ impl SnapshotFile {
/// Get SnapshotFiles which match the filter grouped by the group criterion
/// from the backend
pub async fn group_from_backend<B: DecryptReadBackend>(
pub fn group_from_backend<B: DecryptReadBackend>(
be: &B,
filter: &SnapshotFilter,
crit: &SnapshotGroupCriterion,
) -> Result<Vec<(SnapshotGroup, Vec<Self>)>> {
let mut snaps = Self::all_from_backend(be, filter).await?;
let mut snaps = Self::all_from_backend(be, filter)?;
snaps.sort_unstable_by(|sn1, sn2| sn1.cmp_group(crit, sn2));
let mut result = Vec::new();
@ -233,17 +231,16 @@ impl SnapshotFile {
Ok(result)
}
pub async fn all_from_backend<B: DecryptReadBackend>(
pub fn all_from_backend<B: DecryptReadBackend>(
be: &B,
filter: &SnapshotFilter,
) -> Result<Vec<Self>> {
Ok(be
.stream_all::<SnapshotFile>(ProgressBar::hidden())
.await?
.map_ok(Self::set_id)
.try_filter(|sn| future::ready(sn.matches(filter)))
.try_collect()
.await?)
.stream_all::<SnapshotFile>(ProgressBar::hidden())?
.into_iter()
.map(Self::set_id)
.filter(|sn| sn.matches(filter))
.collect())
}
pub fn matches(&self, filter: &SnapshotFilter) -> bool {