diff --git a/Cargo.lock b/Cargo.lock index 93dcfea..8d2e78c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -356,6 +356,40 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2dd04ddaf88237dc3b8d8f9a3c1004b506b54b3313403944054d23c0870c521" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-deque" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "715e8152b692bba2d374b53d4875445368fdf21a94751410af607a5ac677d1fc" +dependencies = [ + "cfg-if", + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f916dfc5d356b0ed9dae65f1db9fc9770aa2851d2662b988ccf4fe3516e86348" +dependencies = [ + "autocfg", + "cfg-if", + "crossbeam-utils", + "memoffset", + "scopeguard", +] + [[package]] name = "crossbeam-utils" version = "0.8.11" @@ -1407,6 +1441,30 @@ dependencies = [ "getrandom", ] +[[package]] +name = "rayon" +version = "1.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd99e5772ead8baa5215278c9b15bf92087709e9c1b2d1f97cdb5a183c933a7d" +dependencies = [ + "autocfg", + "crossbeam-deque", + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "258bcdb5ac6dad48491bb2992db6b7cf74878b0384908af124823d118c99683f" +dependencies = [ + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-utils", + "num_cpus", +] + [[package]] name = "redox_syscall" version = "0.2.16" @@ -1570,6 +1628,7 @@ dependencies = [ "chrono", "clap", "clap_complete", + "crossbeam-channel", "derivative", "derive-getters", "derive_more", @@ -1595,6 +1654,7 @@ dependencies = [ "quickcheck", "quickcheck_macros", "rand", + "rayon", "reqwest", "rpassword", "rstest", diff --git a/Cargo.toml b/Cargo.toml index a2f7599..e9a602e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,9 @@ derivative = "2" derive-getters = "0.2" lazy_static = "1" log = "0.4" +# parallelize +crossbeam-channel = "0.5" +rayon = "1" # async tokio = { version = "1", features = ["full"] } futures = "0.3" diff --git a/src/backend/decrypt.rs b/src/backend/decrypt.rs index c30fc1e..6a90c75 100644 --- a/src/backend/decrypt.rs +++ b/src/backend/decrypt.rs @@ -1,11 +1,10 @@ use std::num::NonZeroU32; use anyhow::{bail, Result}; -use async_trait::async_trait; use bytes::Bytes; -use futures::{stream, stream::FuturesUnordered, StreamExt}; +use crossbeam_channel::{unbounded, Receiver}; use indicatif::ProgressBar; -use tokio::{spawn, task::JoinHandle}; +use rayon::prelude::*; use zstd::stream::{copy_encode, decode_all}; use super::{FileType, Id, ReadBackend, RepoFile, WriteBackend}; @@ -14,7 +13,6 @@ use crate::crypto::{hash, CryptoKey}; pub trait DecryptFullBackend: DecryptWriteBackend + DecryptReadBackend {} impl DecryptFullBackend for T {} -#[async_trait] pub trait DecryptReadBackend: ReadBackend { fn decrypt(&self, data: &[u8]) -> Result>; @@ -52,38 +50,25 @@ pub trait DecryptReadBackend: ReadBackend { Ok(serde_json::from_slice(&data)?) } - async fn stream_all( - &self, - p: ProgressBar, - ) -> Result>> { + fn stream_all(&self, p: ProgressBar) -> Result> { let list = self.list(F::TYPE)?; - self.stream_list(list, p).await + self.stream_list(list, p) } - async fn stream_list( - &self, - list: Vec, - p: ProgressBar, - ) -> Result>> { + fn stream_list(&self, list: Vec, p: ProgressBar) -> Result> { p.set_length(list.len() as u64); + let (tx, rx) = unbounded(); - let stream: FuturesUnordered<_> = list - .into_iter() - .map(|id| { - let be = self.clone(); - let p = p.clone(); - spawn(async move { - let file = be.get_file::(&id).unwrap(); - p.inc(1); - (id, file) - }) - }) - .collect(); - Ok(stream) + list.into_par_iter() + .for_each_with((self, p, tx), |(be, p, tx), id| { + let file = be.get_file::(&id).unwrap(); + p.inc(1); + tx.send((id, file)).unwrap(); + }); + Ok(rx) } } -#[async_trait] pub trait DecryptWriteBackend: WriteBackend { type Key: CryptoKey; fn key(&self) -> &Self::Key; @@ -91,26 +76,20 @@ pub trait DecryptWriteBackend: WriteBackend { fn save_file(&self, file: &F) -> Result { let data = serde_json::to_vec(file)?; - Ok(self.hash_write_full(F::TYPE, &data)?) + self.hash_write_full(F::TYPE, &data) } - async fn save_list(&self, list: Vec, p: ProgressBar) -> Result<()> { + fn save_list(&self, list: Vec, p: ProgressBar) -> Result<()> { p.set_length(list.len() as u64); - stream::iter(list.into_iter().map(|file| { - let be = self.clone(); - let p = p.clone(); - (file, be, p) - })) - .for_each_concurrent(5, |(file, be, p)| async move { - be.save_file(&file).unwrap(); + list.par_iter().for_each(|file| { + self.save_file(file).unwrap(); p.inc(1); - }) - .await; + }); p.finish(); Ok(()) } - async fn delete_list( + fn delete_list( &self, tpe: FileType, cacheable: bool, @@ -118,16 +97,10 @@ pub trait DecryptWriteBackend: WriteBackend { p: ProgressBar, ) -> Result<()> { p.set_length(list.len() as u64); - stream::iter(list.into_iter().map(|id| { - let be = self.clone(); - let p = p.clone(); - (id, be, p) - })) - .for_each_concurrent(20, |(id, be, p)| async move { - be.remove(tpe, &id, cacheable).unwrap(); + list.par_iter().for_each(|id| { + self.remove(tpe, id, cacheable).unwrap(); p.inc(1); - }) - .await; + }); p.finish(); Ok(()) @@ -153,7 +126,6 @@ impl DecryptBackend { } } -#[async_trait] impl DecryptWriteBackend for DecryptBackend { type Key = C; fn key(&self) -> &Self::Key { @@ -179,7 +151,6 @@ impl DecryptWriteBackend for DecryptBackend } } -#[async_trait] impl DecryptReadBackend for DecryptBackend { fn decrypt(&self, data: &[u8]) -> Result> { Ok(self.key.decrypt_data(data)?) diff --git a/src/commands/backup.rs b/src/commands/backup.rs index 7d8516f..80922ea 100644 --- a/src/commands/backup.rs +++ b/src/commands/backup.rs @@ -116,7 +116,7 @@ pub(super) async fn execute( } }; - let index = IndexBackend::only_full_trees(&be.clone(), progress_counter("")).await?; + let index = IndexBackend::only_full_trees(&be.clone(), progress_counter(""))?; for source in sources { let mut opts = opts.clone(); @@ -161,7 +161,6 @@ pub(super) async fn execute( |snap| snap.hostname == hostname && snap.paths.contains(&backup_path_str), progress_counter(""), ) - .await .ok(), (false, false, Some(parent)) => SnapshotFile::from_id(&be, &parent).ok(), }; diff --git a/src/commands/cat.rs b/src/commands/cat.rs index 9d3c9bb..ab2c60e 100644 --- a/src/commands/cat.rs +++ b/src/commands/cat.rs @@ -46,16 +46,16 @@ struct TreeOpts { snap: String, } -pub(super) async fn execute(be: &impl DecryptReadBackend, opts: Opts) -> Result<()> { +pub(super) fn execute(be: &impl DecryptReadBackend, opts: Opts) -> Result<()> { match opts.command { Command::Config => cat_file(be, FileType::Config, IdOpt::default()), Command::Index(opt) => cat_file(be, FileType::Index, opt), Command::Snapshot(opt) => cat_file(be, FileType::Snapshot, opt), // special treatment for catingg blobs: read the index and use it to locate the blob - Command::TreeBlob(opt) => cat_blob(be, BlobType::Tree, opt).await, - Command::DataBlob(opt) => cat_blob(be, BlobType::Data, opt).await, + Command::TreeBlob(opt) => cat_blob(be, BlobType::Tree, opt), + Command::DataBlob(opt) => cat_blob(be, BlobType::Data, opt), // special treatment for cating a tree within a snapshot - Command::Tree(opts) => cat_tree(be, opts).await, + Command::Tree(opts) => cat_tree(be, opts), } } @@ -67,20 +67,18 @@ fn cat_file(be: &impl DecryptReadBackend, tpe: FileType, opt: IdOpt) -> Result<( Ok(()) } -async fn cat_blob(be: &impl DecryptReadBackend, tpe: BlobType, opt: IdOpt) -> Result<()> { +fn cat_blob(be: &impl DecryptReadBackend, tpe: BlobType, opt: IdOpt) -> Result<()> { let id = Id::from_hex(&opt.id)?; - let data = IndexBackend::new(be, ProgressBar::hidden()) - .await? - .blob_from_backend(&tpe, &id)?; + let data = IndexBackend::new(be, ProgressBar::hidden())?.blob_from_backend(&tpe, &id)?; print!("{}", String::from_utf8(data.to_vec())?); Ok(()) } -async fn cat_tree(be: &impl DecryptReadBackend, opts: TreeOpts) -> Result<()> { +fn cat_tree(be: &impl DecryptReadBackend, opts: TreeOpts) -> Result<()> { let (id, path) = opts.snap.split_once(':').unwrap_or((&opts.snap, "")); - let snap = SnapshotFile::from_str(be, id, |_| true, progress_counter("")).await?; - let index = IndexBackend::new(be, progress_counter("")).await?; + let snap = SnapshotFile::from_str(be, id, |_| true, progress_counter(""))?; + let index = IndexBackend::new(be, progress_counter(""))?; let id = Tree::subtree_id(&index, snap.tree, Path::new(path))?; let data = index.blob_from_backend(&BlobType::Tree, &id)?; println!("{}", String::from_utf8(data.to_vec())?); diff --git a/src/commands/check.rs b/src/commands/check.rs index 18e77a1..1dbf85b 100644 --- a/src/commands/check.rs +++ b/src/commands/check.rs @@ -56,11 +56,11 @@ pub(super) async fn execute( if let Some(hot_be) = hot_be { for file_type in [FileType::Snapshot, FileType::Index] { - check_hot_files(raw_be, hot_be, file_type).await?; + check_hot_files(raw_be, hot_be, file_type)?; } } - let index_collector = check_packs(be, hot_be, opts.read_data).await?; + let index_collector = check_packs(be, hot_be, opts.read_data)?; if !opts.trust_cache { if let Some(cache) = &cache { @@ -102,7 +102,7 @@ pub(super) async fn execute( Ok(()) } -async fn check_hot_files( +fn check_hot_files( be: &impl ReadBackend, be_hot: &impl ReadBackend, file_type: FileType, @@ -180,7 +180,7 @@ async fn check_cache_files( } // check if packs correspond to index -async fn check_packs( +fn check_packs( be: &impl DecryptReadBackend, hot_be: &Option, read_data: bool, @@ -224,9 +224,7 @@ async fn check_packs( }; let p = progress_counter("reading index..."); - let mut stream = be.stream_all::(p.clone()).await?; - while let Some(index) = stream.try_next().await? { - let index = index.1; + for (_, index) in be.stream_all::(p.clone())? { index_collector.extend(index.packs.clone()); for p in index.packs { process_pack(p); @@ -235,22 +233,23 @@ async fn check_packs( process_pack(p); } } + p.finish(); if let Some(hot_be) = hot_be { let p = progress_spinner("listing packs in hot repo..."); - check_packs_list(hot_be, tree_packs).await?; + check_packs_list(hot_be, tree_packs)?; p.finish(); } let p = progress_spinner("listing packs..."); - check_packs_list(be, packs).await?; + check_packs_list(be, packs)?; p.finish(); Ok(index_collector) } -async fn check_packs_list(be: &impl ReadBackend, mut packs: HashMap) -> Result<()> { +fn check_packs_list(be: &impl ReadBackend, mut packs: HashMap) -> Result<()> { for (id, size) in be.list_with_size(FileType::Pack)? { match packs.remove(&id) { None => warn!("pack {id} not referenced in index. Can be a parallel backup job. To repair: 'rustic repair index'."), @@ -272,11 +271,10 @@ async fn check_snapshots(index: &(impl IndexedBackend + Unpin)) -> Result<()> { let p = progress_counter("reading snapshots..."); let snap_trees: Vec<_> = index .be() - .stream_all::(p.clone()) - .await? - .map_ok(|(_, snap)| snap.tree) - .try_collect() - .await?; + .stream_all::(p.clone())? + .iter() + .map(|(_, snap)| snap.tree) + .collect(); p.finish(); let p = progress_counter("checking trees..."); diff --git a/src/commands/diff.rs b/src/commands/diff.rs index 8503a48..612f51c 100644 --- a/src/commands/diff.rs +++ b/src/commands/diff.rs @@ -28,12 +28,12 @@ pub(super) async fn execute(be: &(impl DecryptReadBackend + Unpin), opts: Opts) let p = progress_spinner("getting snapshots..."); p.finish(); - let snaps = SnapshotFile::from_ids(be, &[id1.to_string(), id2.to_string()]).await?; + let snaps = SnapshotFile::from_ids(be, &[id1.to_string(), id2.to_string()])?; let snap1 = &snaps[0]; let snap2 = &snaps[1]; - let index = IndexBackend::new(be, progress_counter("")).await?; + let index = IndexBackend::new(be, progress_counter(""))?; let id1 = Tree::subtree_id(&index, snap1.tree, Path::new(path1))?; let id2 = Tree::subtree_id(&index, snap2.tree, Path::new(path2))?; diff --git a/src/commands/forget.rs b/src/commands/forget.rs index 5df976b..5201f64 100644 --- a/src/commands/forget.rs +++ b/src/commands/forget.rs @@ -74,10 +74,10 @@ pub(super) async fn execute( .unwrap_or_else(|| SnapshotGroupCriterion::from_str("host,paths").unwrap()); let groups = match opts.ids.is_empty() { - true => SnapshotFile::group_from_backend(be, &opts.config.filter, &group_by).await?, + true => SnapshotFile::group_from_backend(be, &opts.config.filter, &group_by)?, false => vec![( SnapshotGroup::default(), - SnapshotFile::from_ids(be, &opts.ids).await?, + SnapshotFile::from_ids(be, &opts.ids)?, )], }; let mut forget_snaps = Vec::new(); @@ -146,8 +146,7 @@ pub(super) async fn execute( ), (false, false) => { let p = progress_counter("removing snapshots..."); - be.delete_list(FileType::Snapshot, true, forget_snaps.clone(), p) - .await?; + be.delete_list(FileType::Snapshot, true, forget_snaps.clone(), p)?; } } diff --git a/src/commands/list.rs b/src/commands/list.rs index 1eca397..3b9e5f9 100644 --- a/src/commands/list.rs +++ b/src/commands/list.rs @@ -1,6 +1,5 @@ use anyhow::{bail, Result}; use clap::Parser; -use futures::StreamExt; use indicatif::ProgressBar; use crate::backend::{DecryptReadBackend, FileType}; @@ -13,13 +12,12 @@ pub(super) struct Opts { tpe: String, } -pub(super) async fn execute(be: &impl DecryptReadBackend, opts: Opts) -> Result<()> { +pub(super) fn execute(be: &impl DecryptReadBackend, opts: Opts) -> Result<()> { let tpe = match opts.tpe.as_str() { // special treatment for listing blobs: read the index and display it "blobs" => { - let mut stream = be.stream_all::(ProgressBar::hidden()).await?; - while let Some(index) = stream.next().await { - for pack in index?.1.packs { + for (_, index) in be.stream_all::(ProgressBar::hidden())? { + for pack in index.packs { for blob in pack.blobs { println!("{:?} {}", blob.tpe, blob.id.to_hex()); } diff --git a/src/commands/ls.rs b/src/commands/ls.rs index 477248a..f812eb4 100644 --- a/src/commands/ls.rs +++ b/src/commands/ls.rs @@ -18,8 +18,8 @@ pub(super) struct Opts { pub(super) async fn execute(be: &(impl DecryptReadBackend + Unpin), opts: Opts) -> Result<()> { let (id, path) = opts.snap.split_once(':').unwrap_or((&opts.snap, "")); - let snap = SnapshotFile::from_str(be, id, |_| true, progress_counter("")).await?; - let index = IndexBackend::new(be, progress_counter("")).await?; + let snap = SnapshotFile::from_str(be, id, |_| true, progress_counter(""))?; + let index = IndexBackend::new(be, progress_counter(""))?; let tree = Tree::subtree_id(&index, snap.tree, Path::new(path))?; let mut tree_streamer = NodeStreamer::new(index, tree)?; diff --git a/src/commands/mod.rs b/src/commands/mod.rs index 30c233e..da4d6de 100644 --- a/src/commands/mod.rs +++ b/src/commands/mod.rs @@ -307,22 +307,22 @@ pub async fn execute() -> Result<()> { match cmd { Command::Backup(opts) => backup::execute(&dbe, opts, config, config_file, command).await?, Command::Config(opts) => config::execute(&dbe, &be_hot, opts, config)?, - Command::Cat(opts) => cat::execute(&dbe, opts).await?, + Command::Cat(opts) => cat::execute(&dbe, opts)?, Command::Check(opts) => check::execute(&dbe, &cache, &be_hot, &be, opts).await?, Command::Completions(_) => {} // already handled above Command::Diff(opts) => diff::execute(&dbe, opts).await?, Command::Forget(opts) => forget::execute(&dbe, cache, opts, config, config_file).await?, Command::Init(_) => {} // already handled above Command::Key(opts) => key::execute(&dbe, key, opts)?, - Command::List(opts) => list::execute(&dbe, opts).await?, + Command::List(opts) => list::execute(&dbe, opts)?, Command::Ls(opts) => ls::execute(&dbe, opts).await?, Command::SelfUpdate(_) => {} // already handled above - Command::Snapshots(opts) => snapshots::execute(&dbe, opts, config_file).await?, + Command::Snapshots(opts) => snapshots::execute(&dbe, opts, config_file)?, Command::Prune(opts) => prune::execute(&dbe, cache, opts, config, vec![]).await?, Command::Restore(opts) => restore::execute(&dbe, opts).await?, Command::Repair(opts) => repair::execute(&dbe, opts, config_file, &config).await?, - Command::Repoinfo(opts) => repoinfo::execute(&dbe, &be_hot, opts).await?, - Command::Tag(opts) => tag::execute(&dbe, opts, config_file).await?, + Command::Repoinfo(opts) => repoinfo::execute(&dbe, &be_hot, opts)?, + Command::Tag(opts) => tag::execute(&dbe, opts, config_file)?, }; Ok(()) diff --git a/src/commands/prune.rs b/src/commands/prune.rs index cb8984c..804b304 100644 --- a/src/commands/prune.rs +++ b/src/commands/prune.rs @@ -7,7 +7,7 @@ use bytesize::ByteSize; use chrono::{DateTime, Duration, Local}; use clap::{AppSettings, Parser}; use derive_more::Add; -use futures::{future, TryStreamExt}; +use futures::TryStreamExt; use log::*; use super::{bytes, no_progress, progress_bytes, progress_counter, wait, warm_up, warm_up_command}; @@ -98,10 +98,9 @@ pub(super) async fn execute( let mut index_files = Vec::new(); let p = progress_counter("reading index..."); - let mut stream = be.stream_all::(p.clone()).await?; let mut index_collector = IndexCollector::new(IndexType::OnlyTrees); - while let Some((id, index)) = stream.try_next().await? { + for (id, index) in be.stream_all::(p.clone())? { index_collector.extend(index.packs.clone()); // we add the trees from packs_to_delete to the index such that searching for // used blobs doesn't abort if they are already marked for deletion @@ -879,8 +878,7 @@ impl Pruner { let p = progress_counter("removing unindexed packs..."); let existing_packs: Vec<_> = self.existing_packs.into_iter().map(|(id, _)| id).collect(); - be.delete_list(FileType::Pack, true, existing_packs, p) - .await?; + be.delete_list(FileType::Pack, true, existing_packs, p)?; } else { info!("marking not needed unindexed pack files for deletion..."); for (id, size) in self.existing_packs { @@ -991,20 +989,17 @@ impl Pruner { if !data_packs_remove.is_empty() { let p = progress_counter("removing old data packs..."); - be.delete_list(FileType::Pack, false, data_packs_remove, p) - .await?; + be.delete_list(FileType::Pack, false, data_packs_remove, p)?; } if !tree_packs_remove.is_empty() { let p = progress_counter("removing old tree packs..."); - be.delete_list(FileType::Pack, true, tree_packs_remove, p) - .await?; + be.delete_list(FileType::Pack, true, tree_packs_remove, p)?; } if !indexes_remove.is_empty() { let p = progress_counter("removing old index files..."); - be.delete_list(FileType::Index, true, indexes_remove, p) - .await?; + be.delete_list(FileType::Index, true, indexes_remove, p)?; } Ok(()) @@ -1090,14 +1085,13 @@ async fn find_used_blobs( let p = progress_counter("reading snapshots..."); let snap_trees: Vec<_> = index .be() - .stream_all::(p.clone()) - .await? + .stream_all::(p.clone())? + .into_iter() // TODO: it would even better to give ignore_snaps to the streaming function instead // if reading and then filtering the snapshot - .try_filter(|(id, _)| future::ready(!ignore_snaps.contains(id))) - .map_ok(|(_, snap)| snap.tree) - .try_collect() - .await?; + .filter(|(id, _)| !ignore_snaps.contains(id)) + .map(|(_, snap)| snap.tree) + .collect(); p.finish(); let mut ids: HashMap<_, _> = snap_trees.iter().map(|id| (*id, 0)).collect(); diff --git a/src/commands/repair.rs b/src/commands/repair.rs index fac4446..d36bdcc 100644 --- a/src/commands/repair.rs +++ b/src/commands/repair.rs @@ -3,7 +3,6 @@ use std::collections::{HashMap, HashSet}; use anyhow::Result; use async_recursion::async_recursion; use clap::{AppSettings, Parser, Subcommand}; -use futures::TryStreamExt; use log::*; use crate::backend::{DecryptFullBackend, DecryptWriteBackend, FileType}; @@ -142,12 +141,9 @@ async fn repair_index(be: &impl DecryptFullBackend, opts: IndexOpts) -> Result<( }; let p = progress_counter("reading index..."); - let mut stream = be.stream_all::(p.clone()).await?; - while let Some(index) = stream.try_next().await? { + for (index_id, index) in be.stream_all::(p.clone())? { let mut new_index = IndexFile::default(); let mut changed = false; - let index_id = index.0; - let index = index.1; for p in index.packs { process_pack(p, false, &mut new_index, &mut changed); } @@ -214,15 +210,15 @@ async fn repair_snaps( config_file.merge_into("snapshot-filter", &mut opts.filter)?; let snapshots = match opts.ids.is_empty() { - true => SnapshotFile::all_from_backend(be, &opts.filter).await?, - false => SnapshotFile::from_ids(be, &opts.ids).await?, + true => SnapshotFile::all_from_backend(be, &opts.filter)?, + false => SnapshotFile::from_ids(be, &opts.ids)?, }; let mut replaced = HashMap::new(); let mut seen = HashSet::new(); let mut delete = Vec::new(); - let index = IndexBackend::new(&be.clone(), progress_counter("")).await?; + let index = IndexBackend::new(&be.clone(), progress_counter(""))?; let indexer = Indexer::new(be.clone()).into_shared(); let mut packer = Packer::new( be.clone(), @@ -284,8 +280,7 @@ async fn repair_snaps( true, delete, progress_counter("remove defect snapshots"), - ) - .await?; + )?; } } diff --git a/src/commands/repoinfo.rs b/src/commands/repoinfo.rs index ebc32f5..c975cc1 100644 --- a/src/commands/repoinfo.rs +++ b/src/commands/repoinfo.rs @@ -1,7 +1,6 @@ use anyhow::Result; use clap::Parser; use derive_more::Add; -use futures::TryStreamExt; use log::*; use prettytable::{format, row, Table}; @@ -14,19 +13,16 @@ use crate::repo::{IndexFile, IndexPack}; #[derive(Parser)] pub(super) struct Opts; -pub(super) async fn execute( +pub(super) fn execute( be: &impl DecryptReadBackend, hot_be: &Option, _opts: Opts, ) -> Result<()> { - fileinfo("repository files", be).await?; + fileinfo("repository files", be)?; if let Some(hot_be) = hot_be { - fileinfo("hot repository files", hot_be).await?; + fileinfo("hot repository files", hot_be)?; } - let p = progress_counter("scanning index..."); - let mut stream = be.stream_all::(p.clone()).await?; - #[derive(Default, Clone, Copy, Add)] struct Info { count: u64, @@ -59,7 +55,8 @@ pub(super) async fn execute( info[BlobType::Data].min_pack_size = u64::MAX; let mut info_delete = BlobTypeMap::::default(); - while let Some((_, index)) = stream.try_next().await? { + let p = progress_counter("scanning index..."); + for (_, index) in be.stream_all::(p.clone())? { for pack in &index.packs { info[pack.blob_type()].add_pack(pack); @@ -109,7 +106,7 @@ pub(super) async fn execute( Ok(()) } -async fn fileinfo(text: &str, be: &impl ReadBackend) -> Result<()> { +fn fileinfo(text: &str, be: &impl ReadBackend) -> Result<()> { info!("scanning files..."); let mut table = Table::new(); diff --git a/src/commands/restore.rs b/src/commands/restore.rs index e0ebc42..812e513 100644 --- a/src/commands/restore.rs +++ b/src/commands/restore.rs @@ -67,9 +67,9 @@ pub(super) async fn execute(be: &(impl DecryptReadBackend + Unpin), opts: Opts) } let (id, path) = opts.snap.split_once(':').unwrap_or((&opts.snap, "")); - let snap = SnapshotFile::from_str(be, id, |_| true, progress_counter("")).await?; + let snap = SnapshotFile::from_str(be, id, |_| true, progress_counter(""))?; - let index = IndexBackend::new(be, progress_counter("")).await?; + let index = IndexBackend::new(be, progress_counter(""))?; let tree = Tree::subtree_id(&index, snap.tree, Path::new(path))?; let dest = LocalBackend::new(&opts.dest); diff --git a/src/commands/snapshots.rs b/src/commands/snapshots.rs index fdcd70d..09c2a06 100644 --- a/src/commands/snapshots.rs +++ b/src/commands/snapshots.rs @@ -43,7 +43,7 @@ pub(super) struct Opts { ids: Vec, } -pub(super) async fn execute( +pub(super) fn execute( be: &impl DecryptReadBackend, mut opts: Opts, config_file: RusticConfig, @@ -51,10 +51,9 @@ pub(super) async fn execute( config_file.merge_into("snapshot-filter", &mut opts.filter)?; let groups = match &opts.ids[..] { - [] => SnapshotFile::group_from_backend(be, &opts.filter, &opts.group_by).await?, + [] => SnapshotFile::group_from_backend(be, &opts.filter, &opts.group_by)?, [id] if id == "latest" => { - SnapshotFile::group_from_backend(be, &opts.filter, &opts.group_by) - .await? + SnapshotFile::group_from_backend(be, &opts.filter, &opts.group_by)? .into_iter() .map(|(group, mut snaps)| { snaps.sort_unstable(); @@ -67,7 +66,7 @@ pub(super) async fn execute( } _ => vec![( SnapshotGroup::default(), - SnapshotFile::from_ids(be, &opts.ids).await?, + SnapshotFile::from_ids(be, &opts.ids)?, )], }; diff --git a/src/commands/tag.rs b/src/commands/tag.rs index 7758905..8e100a7 100644 --- a/src/commands/tag.rs +++ b/src/commands/tag.rs @@ -68,7 +68,7 @@ pub(super) struct Opts { ids: Vec, } -pub(super) async fn execute( +pub(super) fn execute( be: &impl DecryptFullBackend, mut opts: Opts, config_file: RusticConfig, @@ -76,8 +76,8 @@ pub(super) async fn execute( config_file.merge_into("snapshot-filter", &mut opts.filter)?; let snapshots = match opts.ids.is_empty() { - true => SnapshotFile::all_from_backend(be, &opts.filter).await?, - false => SnapshotFile::from_ids(be, &opts.ids).await?, + true => SnapshotFile::all_from_backend(be, &opts.filter)?, + false => SnapshotFile::from_ids(be, &opts.ids)?, }; let delete = match ( @@ -109,11 +109,10 @@ pub(super) async fn execute( ), (false, false) => { let p = progress_counter("saving new snapshots..."); - be.save_list(snapshots, p).await?; + be.save_list(snapshots, p)?; let p = progress_counter("deleting old snapshots..."); - be.delete_list(FileType::Snapshot, true, old_snap_ids, p) - .await?; + be.delete_list(FileType::Snapshot, true, old_snap_ids, p)?; } } Ok(()) diff --git a/src/index/mod.rs b/src/index/mod.rs index dc5d4c5..17035bb 100644 --- a/src/index/mod.rs +++ b/src/index/mod.rs @@ -2,11 +2,9 @@ use std::num::NonZeroU32; use std::sync::Arc; use anyhow::{anyhow, Result}; -use async_trait::async_trait; use bytes::Bytes; use derive_getters::Getters; use derive_more::Constructor; -use futures::StreamExt; use indicatif::ProgressBar; use crate::backend::{DecryptReadBackend, FileType}; @@ -83,7 +81,6 @@ pub trait ReadIndex { } } -#[async_trait] pub trait IndexedBackend: ReadIndex + Clone + Sync + Send + 'static { type Backend: DecryptReadBackend; @@ -124,31 +121,23 @@ impl IndexBackend { } } - async fn new_from_collector( - be: &BE, - p: ProgressBar, - mut collector: IndexCollector, - ) -> Result { + fn new_from_collector(be: &BE, p: ProgressBar, mut collector: IndexCollector) -> Result { p.set_prefix("reading index..."); - let mut stream = be - .stream_all::(p.clone()) - .await? - .map(|i| i.unwrap().1); - - while let Some(index) = stream.next().await { - collector.extend(index.packs); + for (_, i) in be.stream_all::(p.clone())? { + collector.extend(i.packs); } + p.finish(); Ok(Self::new_from_index(be, collector.into_index())) } - pub async fn new(be: &BE, p: ProgressBar) -> Result { - Self::new_from_collector(be, p, IndexCollector::new(IndexType::Full)).await + pub fn new(be: &BE, p: ProgressBar) -> Result { + Self::new_from_collector(be, p, IndexCollector::new(IndexType::Full)) } - pub async fn only_full_trees(be: &BE, p: ProgressBar) -> Result { - Self::new_from_collector(be, p, IndexCollector::new(IndexType::FullTrees)).await + pub fn only_full_trees(be: &BE, p: ProgressBar) -> Result { + Self::new_from_collector(be, p, IndexCollector::new(IndexType::FullTrees)) } pub fn into_index(self) -> Index { diff --git a/src/repo/snapshotfile.rs b/src/repo/snapshotfile.rs index 0788f46..b29eaea 100644 --- a/src/repo/snapshotfile.rs +++ b/src/repo/snapshotfile.rs @@ -6,7 +6,6 @@ use anyhow::{anyhow, bail, Result}; use chrono::{DateTime, Local}; use clap::Parser; use derivative::Derivative; -use futures::{future, TryStreamExt}; use indicatif::ProgressBar; use log::*; use merge::Merge; @@ -112,33 +111,33 @@ impl SnapshotFile { Ok(Self::set_id((*id, be.get_file(id)?))) } - pub async fn from_str( + pub fn from_str( be: &B, string: &str, - predicate: impl FnMut(&Self) -> bool, + predicate: impl FnMut(&Self) -> bool + Send + Sync, p: ProgressBar, ) -> Result { match string { - "latest" => Self::latest(be, predicate, p).await, + "latest" => Self::latest(be, predicate, p), _ => Self::from_id(be, string), } } /// Get the latest SnapshotFile from the backend - pub async fn latest( + pub fn latest( be: &B, - predicate: impl FnMut(&Self) -> bool, + predicate: impl FnMut(&Self) -> bool + Send + Sync, p: ProgressBar, ) -> Result { p.set_prefix("getting latest snapshot..."); let mut latest: Option = None; let mut pred = predicate; - let mut snaps = be.stream_all::(p.clone()).await?; - while let Some((id, mut snap)) = snaps.try_next().await? { + for (id, mut snap) in be.stream_all::(p.clone())? { if !pred(&snap) { continue; } + snap.id = id; match &latest { Some(l) if l.time > snap.time => {} @@ -159,14 +158,13 @@ impl SnapshotFile { } /// Get a Vector of SnapshotFile from the backend by list of (parts of the) ids - pub async fn from_ids(be: &B, ids: &[String]) -> Result> { + pub fn from_ids(be: &B, ids: &[String]) -> Result> { let ids = be.find_ids(FileType::Snapshot, ids)?; Ok(be - .stream_list::(ids, ProgressBar::hidden()) - .await? - .map_ok(Self::set_id) - .try_collect() - .await?) + .stream_list::(ids, ProgressBar::hidden())? + .into_iter() + .map(Self::set_id) + .collect()) } fn cmp_group(&self, crit: &SnapshotGroupCriterion, other: &Self) -> Ordering { @@ -199,12 +197,12 @@ impl SnapshotFile { /// Get SnapshotFiles which match the filter grouped by the group criterion /// from the backend - pub async fn group_from_backend( + pub fn group_from_backend( be: &B, filter: &SnapshotFilter, crit: &SnapshotGroupCriterion, ) -> Result)>> { - let mut snaps = Self::all_from_backend(be, filter).await?; + let mut snaps = Self::all_from_backend(be, filter)?; snaps.sort_unstable_by(|sn1, sn2| sn1.cmp_group(crit, sn2)); let mut result = Vec::new(); @@ -233,17 +231,16 @@ impl SnapshotFile { Ok(result) } - pub async fn all_from_backend( + pub fn all_from_backend( be: &B, filter: &SnapshotFilter, ) -> Result> { Ok(be - .stream_all::(ProgressBar::hidden()) - .await? - .map_ok(Self::set_id) - .try_filter(|sn| future::ready(sn.matches(filter))) - .try_collect() - .await?) + .stream_all::(ProgressBar::hidden())? + .into_iter() + .map(Self::set_id) + .filter(|sn| sn.matches(filter)) + .collect()) } pub fn matches(&self, filter: &SnapshotFilter) -> bool {