diff --git a/src/backend/decrypt.rs b/src/backend/decrypt.rs index 6a90c75..eb1a0f3 100644 --- a/src/backend/decrypt.rs +++ b/src/backend/decrypt.rs @@ -37,7 +37,7 @@ pub trait DecryptReadBackend: ReadBackend { ) -> Result { let mut data = self.decrypt(&self.read_partial(tpe, id, cacheable, offset, length)?)?; if let Some(length) = uncompressed_length { - data = decode_all(&*data).unwrap(); + data = decode_all(&*data)?; if data.len() != length.get() as usize { bail!("length of uncompressed data does not match!"); } @@ -50,20 +50,24 @@ pub trait DecryptReadBackend: ReadBackend { Ok(serde_json::from_slice(&data)?) } - fn stream_all(&self, p: ProgressBar) -> Result> { + fn stream_all(&self, p: ProgressBar) -> Result>> { let list = self.list(F::TYPE)?; self.stream_list(list, p) } - fn stream_list(&self, list: Vec, p: ProgressBar) -> Result> { + fn stream_list( + &self, + list: Vec, + p: ProgressBar, + ) -> Result>> { p.set_length(list.len() as u64); let (tx, rx) = unbounded(); list.into_par_iter() .for_each_with((self, p, tx), |(be, p, tx), id| { - let file = be.get_file::(&id).unwrap(); + let file = be.get_file::(&id).map(|file| (id, file)); p.inc(1); - tx.send((id, file)).unwrap(); + tx.send(file).unwrap(); }); Ok(rx) } diff --git a/src/commands/check.rs b/src/commands/check.rs index aa3ec9f..99a821b 100644 --- a/src/commands/check.rs +++ b/src/commands/check.rs @@ -4,6 +4,7 @@ use anyhow::Result; use bytes::Bytes; use clap::Parser; use indicatif::ProgressBar; +use itertools::Itertools; use log::*; use rayon::prelude::*; use zstd::stream::decode_all; @@ -31,7 +32,7 @@ pub(super) struct Opts { } pub(super) fn execute( - be: &(impl DecryptReadBackend + Unpin), + be: &impl DecryptReadBackend, cache: &Option, hot_be: &Option, raw_be: &impl ReadBackend, @@ -211,7 +212,8 @@ fn check_packs( }; let p = progress_counter("reading index..."); - for (_, index) in be.stream_all::(p.clone())? { + for index in be.stream_all::(p.clone())? { + let index = index?.1; index_collector.extend(index.packs.clone()); for p in index.packs { process_pack(p); @@ -254,14 +256,14 @@ fn check_packs_list(be: &impl ReadBackend, mut packs: HashMap) -> Resul } // check if all snapshots and contained trees can be loaded and contents exist in the index -fn check_snapshots(index: &(impl IndexedBackend + Unpin)) -> Result<()> { +fn check_snapshots(index: &impl IndexedBackend) -> Result<()> { let p = progress_counter("reading snapshots..."); let snap_trees: Vec<_> = index .be() .stream_all::(p.clone())? .iter() - .map(|(_, snap)| snap.tree) - .collect(); + .map_ok(|(_, snap)| snap.tree) + .try_collect()?; p.finish(); let p = progress_counter("checking trees..."); diff --git a/src/commands/list.rs b/src/commands/list.rs index 3b9e5f9..2f9e67a 100644 --- a/src/commands/list.rs +++ b/src/commands/list.rs @@ -16,8 +16,8 @@ pub(super) fn execute(be: &impl DecryptReadBackend, opts: Opts) -> Result<()> { let tpe = match opts.tpe.as_str() { // special treatment for listing blobs: read the index and display it "blobs" => { - for (_, index) in be.stream_all::(ProgressBar::hidden())? { - for pack in index.packs { + for index in be.stream_all::(ProgressBar::hidden())? { + for pack in index?.1.packs { for blob in pack.blobs { println!("{:?} {}", blob.tpe, blob.id.to_hex()); } diff --git a/src/commands/prune.rs b/src/commands/prune.rs index 27834e7..6fea832 100644 --- a/src/commands/prune.rs +++ b/src/commands/prune.rs @@ -8,6 +8,7 @@ use bytesize::ByteSize; use chrono::{DateTime, Duration, Local}; use clap::{AppSettings, Parser}; use derive_more::Add; +use itertools::Itertools; use log::*; use rayon::prelude::*; @@ -101,7 +102,8 @@ pub(super) fn execute( let p = progress_counter("reading index..."); let mut index_collector = IndexCollector::new(IndexType::OnlyTrees); - for (id, index) in be.stream_all::(p.clone())? { + for index in be.stream_all::(p.clone())? { + let (id, index) = index?; index_collector.extend(index.packs.clone()); // we add the trees from packs_to_delete to the index such that searching for // used blobs doesn't abort if they are already marked for deletion @@ -1103,8 +1105,8 @@ fn find_used_blobs( .be() .stream_list::(list, p.clone())? .into_iter() - .map(|(_, snap)| snap.tree) - .collect(); + .map_ok(|(_, snap)| snap.tree) + .try_collect()?; p.finish(); let mut ids: HashMap<_, _> = snap_trees.iter().map(|id| (*id, 0)).collect(); diff --git a/src/commands/repair.rs b/src/commands/repair.rs index 0f51748..2b79a54 100644 --- a/src/commands/repair.rs +++ b/src/commands/repair.rs @@ -140,7 +140,8 @@ fn repair_index(be: &impl DecryptFullBackend, opts: IndexOpts) -> Result<()> { }; let p = progress_counter("reading index..."); - for (index_id, index) in be.stream_all::(p.clone())? { + for index in be.stream_all::(p.clone())? { + let (index_id, index) = index?; let mut new_index = IndexFile::default(); let mut changed = false; for p in index.packs { diff --git a/src/commands/repoinfo.rs b/src/commands/repoinfo.rs index c1f6b88..cf6a516 100644 --- a/src/commands/repoinfo.rs +++ b/src/commands/repoinfo.rs @@ -55,7 +55,8 @@ pub(super) fn execute( let mut info_delete = BlobTypeMap::::default(); let p = progress_counter("scanning index..."); - for (_, index) in be.stream_all::(p.clone())? { + for index in be.stream_all::(p.clone())? { + let index = index?.1; for pack in &index.packs { info[pack.blob_type()].add_pack(pack); diff --git a/src/index/mod.rs b/src/index/mod.rs index 17035bb..0fe39e1 100644 --- a/src/index/mod.rs +++ b/src/index/mod.rs @@ -123,8 +123,8 @@ impl IndexBackend { fn new_from_collector(be: &BE, p: ProgressBar, mut collector: IndexCollector) -> Result { p.set_prefix("reading index..."); - for (_, i) in be.stream_all::(p.clone())? { - collector.extend(i.packs); + for index in be.stream_all::(p.clone())? { + collector.extend(index?.1.packs); } p.finish(); diff --git a/src/repo/snapshotfile.rs b/src/repo/snapshotfile.rs index 287c29e..0fcedb7 100644 --- a/src/repo/snapshotfile.rs +++ b/src/repo/snapshotfile.rs @@ -140,7 +140,8 @@ impl SnapshotFile { let mut latest: Option = None; let mut pred = predicate; - for (id, mut snap) in be.stream_all::(p.clone())? { + for snap in be.stream_all::(p.clone())? { + let (id, mut snap) = snap?; if !pred(&snap) { continue; } @@ -167,11 +168,10 @@ impl SnapshotFile { /// Get a Vector of SnapshotFile from the backend by list of (parts of the) ids pub fn from_ids(be: &B, ids: &[String]) -> Result> { let ids = be.find_ids(FileType::Snapshot, ids)?; - Ok(be - .stream_list::(ids, ProgressBar::hidden())? + be.stream_list::(ids, ProgressBar::hidden())? .into_iter() - .map(Self::set_id) - .collect()) + .map_ok(Self::set_id) + .try_collect() } fn cmp_group(&self, crit: &SnapshotGroupCriterion, other: &Self) -> Ordering { @@ -222,7 +222,7 @@ impl SnapshotFile { let mut result = Vec::new(); for (group, snaps) in &snaps .into_iter() - .group_by(|sn| SnapshotGroup::from_sn(&sn, crit)) + .group_by(|sn| SnapshotGroup::from_sn(sn, crit)) { result.push((group, snaps.collect())); } @@ -234,12 +234,11 @@ impl SnapshotFile { be: &B, filter: &SnapshotFilter, ) -> Result> { - Ok(be - .stream_all::(ProgressBar::hidden())? + be.stream_all::(ProgressBar::hidden())? .into_iter() - .map(Self::set_id) - .filter(|sn| sn.matches(filter)) - .collect()) + .map_ok(Self::set_id) + .filter_ok(|sn| sn.matches(filter)) + .try_collect() } pub fn matches(&self, filter: &SnapshotFilter) -> bool {