DecryptBackend:Better error handling

This commit is contained in:
Alexander Weiss 2022-12-01 23:24:12 +01:00
parent 793de9725d
commit d4ff297724
8 changed files with 39 additions and 30 deletions

View File

@ -37,7 +37,7 @@ pub trait DecryptReadBackend: ReadBackend {
) -> Result<Bytes> {
let mut data = self.decrypt(&self.read_partial(tpe, id, cacheable, offset, length)?)?;
if let Some(length) = uncompressed_length {
data = decode_all(&*data).unwrap();
data = decode_all(&*data)?;
if data.len() != length.get() as usize {
bail!("length of uncompressed data does not match!");
}
@ -50,20 +50,24 @@ pub trait DecryptReadBackend: ReadBackend {
Ok(serde_json::from_slice(&data)?)
}
fn stream_all<F: RepoFile>(&self, p: ProgressBar) -> Result<Receiver<(Id, F)>> {
fn stream_all<F: RepoFile>(&self, p: ProgressBar) -> Result<Receiver<Result<(Id, F)>>> {
let list = self.list(F::TYPE)?;
self.stream_list(list, p)
}
fn stream_list<F: RepoFile>(&self, list: Vec<Id>, p: ProgressBar) -> Result<Receiver<(Id, F)>> {
fn stream_list<F: RepoFile>(
&self,
list: Vec<Id>,
p: ProgressBar,
) -> Result<Receiver<Result<(Id, F)>>> {
p.set_length(list.len() as u64);
let (tx, rx) = unbounded();
list.into_par_iter()
.for_each_with((self, p, tx), |(be, p, tx), id| {
let file = be.get_file::<F>(&id).unwrap();
let file = be.get_file::<F>(&id).map(|file| (id, file));
p.inc(1);
tx.send((id, file)).unwrap();
tx.send(file).unwrap();
});
Ok(rx)
}

View File

@ -4,6 +4,7 @@ use anyhow::Result;
use bytes::Bytes;
use clap::Parser;
use indicatif::ProgressBar;
use itertools::Itertools;
use log::*;
use rayon::prelude::*;
use zstd::stream::decode_all;
@ -31,7 +32,7 @@ pub(super) struct Opts {
}
pub(super) fn execute(
be: &(impl DecryptReadBackend + Unpin),
be: &impl DecryptReadBackend,
cache: &Option<Cache>,
hot_be: &Option<impl ReadBackend>,
raw_be: &impl ReadBackend,
@ -211,7 +212,8 @@ fn check_packs(
};
let p = progress_counter("reading index...");
for (_, index) in be.stream_all::<IndexFile>(p.clone())? {
for index in be.stream_all::<IndexFile>(p.clone())? {
let index = index?.1;
index_collector.extend(index.packs.clone());
for p in index.packs {
process_pack(p);
@ -254,14 +256,14 @@ fn check_packs_list(be: &impl ReadBackend, mut packs: HashMap<Id, u32>) -> Resul
}
// check if all snapshots and contained trees can be loaded and contents exist in the index
fn check_snapshots(index: &(impl IndexedBackend + Unpin)) -> Result<()> {
fn check_snapshots(index: &impl IndexedBackend) -> Result<()> {
let p = progress_counter("reading snapshots...");
let snap_trees: Vec<_> = index
.be()
.stream_all::<SnapshotFile>(p.clone())?
.iter()
.map(|(_, snap)| snap.tree)
.collect();
.map_ok(|(_, snap)| snap.tree)
.try_collect()?;
p.finish();
let p = progress_counter("checking trees...");

View File

@ -16,8 +16,8 @@ pub(super) fn execute(be: &impl DecryptReadBackend, opts: Opts) -> Result<()> {
let tpe = match opts.tpe.as_str() {
// special treatment for listing blobs: read the index and display it
"blobs" => {
for (_, index) in be.stream_all::<IndexFile>(ProgressBar::hidden())? {
for pack in index.packs {
for index in be.stream_all::<IndexFile>(ProgressBar::hidden())? {
for pack in index?.1.packs {
for blob in pack.blobs {
println!("{:?} {}", blob.tpe, blob.id.to_hex());
}

View File

@ -8,6 +8,7 @@ use bytesize::ByteSize;
use chrono::{DateTime, Duration, Local};
use clap::{AppSettings, Parser};
use derive_more::Add;
use itertools::Itertools;
use log::*;
use rayon::prelude::*;
@ -101,7 +102,8 @@ pub(super) fn execute(
let p = progress_counter("reading index...");
let mut index_collector = IndexCollector::new(IndexType::OnlyTrees);
for (id, index) in be.stream_all::<IndexFile>(p.clone())? {
for index in be.stream_all::<IndexFile>(p.clone())? {
let (id, index) = index?;
index_collector.extend(index.packs.clone());
// we add the trees from packs_to_delete to the index such that searching for
// used blobs doesn't abort if they are already marked for deletion
@ -1103,8 +1105,8 @@ fn find_used_blobs(
.be()
.stream_list::<SnapshotFile>(list, p.clone())?
.into_iter()
.map(|(_, snap)| snap.tree)
.collect();
.map_ok(|(_, snap)| snap.tree)
.try_collect()?;
p.finish();
let mut ids: HashMap<_, _> = snap_trees.iter().map(|id| (*id, 0)).collect();

View File

@ -140,7 +140,8 @@ fn repair_index(be: &impl DecryptFullBackend, opts: IndexOpts) -> Result<()> {
};
let p = progress_counter("reading index...");
for (index_id, index) in be.stream_all::<IndexFile>(p.clone())? {
for index in be.stream_all::<IndexFile>(p.clone())? {
let (index_id, index) = index?;
let mut new_index = IndexFile::default();
let mut changed = false;
for p in index.packs {

View File

@ -55,7 +55,8 @@ pub(super) fn execute(
let mut info_delete = BlobTypeMap::<Info>::default();
let p = progress_counter("scanning index...");
for (_, index) in be.stream_all::<IndexFile>(p.clone())? {
for index in be.stream_all::<IndexFile>(p.clone())? {
let index = index?.1;
for pack in &index.packs {
info[pack.blob_type()].add_pack(pack);

View File

@ -123,8 +123,8 @@ impl<BE: DecryptReadBackend> IndexBackend<BE> {
fn new_from_collector(be: &BE, p: ProgressBar, mut collector: IndexCollector) -> Result<Self> {
p.set_prefix("reading index...");
for (_, i) in be.stream_all::<IndexFile>(p.clone())? {
collector.extend(i.packs);
for index in be.stream_all::<IndexFile>(p.clone())? {
collector.extend(index?.1.packs);
}
p.finish();

View File

@ -140,7 +140,8 @@ impl SnapshotFile {
let mut latest: Option<Self> = None;
let mut pred = predicate;
for (id, mut snap) in be.stream_all::<SnapshotFile>(p.clone())? {
for snap in be.stream_all::<SnapshotFile>(p.clone())? {
let (id, mut snap) = snap?;
if !pred(&snap) {
continue;
}
@ -167,11 +168,10 @@ impl SnapshotFile {
/// Get a Vector of SnapshotFile from the backend by list of (parts of the) ids
pub fn from_ids<B: DecryptReadBackend>(be: &B, ids: &[String]) -> Result<Vec<Self>> {
let ids = be.find_ids(FileType::Snapshot, ids)?;
Ok(be
.stream_list::<Self>(ids, ProgressBar::hidden())?
be.stream_list::<Self>(ids, ProgressBar::hidden())?
.into_iter()
.map(Self::set_id)
.collect())
.map_ok(Self::set_id)
.try_collect()
}
fn cmp_group(&self, crit: &SnapshotGroupCriterion, other: &Self) -> Ordering {
@ -222,7 +222,7 @@ impl SnapshotFile {
let mut result = Vec::new();
for (group, snaps) in &snaps
.into_iter()
.group_by(|sn| SnapshotGroup::from_sn(&sn, crit))
.group_by(|sn| SnapshotGroup::from_sn(sn, crit))
{
result.push((group, snaps.collect()));
}
@ -234,12 +234,11 @@ impl SnapshotFile {
be: &B,
filter: &SnapshotFilter,
) -> Result<Vec<Self>> {
Ok(be
.stream_all::<SnapshotFile>(ProgressBar::hidden())?
be.stream_all::<SnapshotFile>(ProgressBar::hidden())?
.into_iter()
.map(Self::set_id)
.filter(|sn| sn.matches(filter))
.collect())
.map_ok(Self::set_id)
.filter_ok(|sn| sn.matches(filter))
.try_collect()
}
pub fn matches(&self, filter: &SnapshotFilter) -> bool {