Merge pull request #250 from rustic-rs/check-read-data

check: Add --read-data
This commit is contained in:
aawsome 2022-10-07 13:05:26 +02:00 committed by GitHub
commit 4f8b21fac5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 404 additions and 87 deletions

View File

@ -3,7 +3,6 @@ use std::num::NonZeroU32;
use std::time::{Duration, SystemTime};
use anyhow::{anyhow, Result};
use binrw::{io::Cursor, BinWrite};
use bytes::{Bytes, BytesMut};
use chrono::Local;
use tokio::{spawn, task::JoinHandle};
@ -14,7 +13,7 @@ use crate::backend::{DecryptFullBackend, DecryptWriteBackend, FileType};
use crate::crypto::{CryptoKey, Hasher};
use crate::id::Id;
use crate::index::SharedIndexer;
use crate::repo::{ConfigFile, IndexBlob, IndexPack};
use crate::repo::{ConfigFile, IndexBlob, IndexPack, PackHeaderLength, PackHeaderRef};
const KB: u32 = 1024;
const MB: u32 = 1024 * KB;
@ -202,68 +201,22 @@ impl<BE: DecryptWriteBackend> Packer<BE> {
/// writes header and length of header to packfile
pub async fn write_header(&mut self) -> Result<()> {
#[derive(BinWrite)]
#[bw(little)]
struct PackHeaderLength(u32);
#[derive(BinWrite)]
#[bw(little)]
struct PackHeaderEntry {
tpe: u8,
len: u32,
id: Id,
}
#[derive(BinWrite)]
#[bw(little)]
struct PackHeaderEntryComp {
tpe: u8,
len: u32,
len_data: u32,
id: Id,
}
// collect header entries
let mut writer = Cursor::new(Vec::new());
for blob in &self.index.blobs {
match blob.uncompressed_length {
None => PackHeaderEntry {
tpe: match blob.tpe {
BlobType::Data => 0b00,
BlobType::Tree => 0b01,
},
len: blob.length,
id: blob.id,
}
.write(&mut writer)?,
Some(len) => PackHeaderEntryComp {
tpe: match blob.tpe {
BlobType::Data => 0b10,
BlobType::Tree => 0b11,
},
len: blob.length,
len_data: len.get(),
id: blob.id,
}
.write(&mut writer)?,
};
}
// comput the pack header
let data = PackHeaderRef::from_index_pack(&self.index).to_binary()?;
// encrypt and write to pack file
let data = writer.into_inner();
let data = self
.be
.key()
.encrypt_data(&data)
.map_err(|_| anyhow!("crypto error"))?;
let headerlen = data.len();
let headerlen = data.len().try_into()?;
self.write_data(&data).await?;
// finally write length of header unencrypted to pack file
let mut writer = Cursor::new(Vec::new());
PackHeaderLength(headerlen.try_into()?).write(&mut writer)?;
let data = writer.into_inner();
self.write_data(&data).await?;
self.write_data(&PackHeaderLength::from_u32(headerlen).to_binary()?)
.await?;
Ok(())
}

View File

@ -1,18 +1,24 @@
use std::collections::HashMap;
use anyhow::Result;
use bytes::Bytes;
use clap::Parser;
use futures::{stream, StreamExt, TryStreamExt};
use indicatif::ProgressBar;
use log::*;
use tokio::task::spawn_blocking;
use zstd::stream::decode_all;
use super::{progress_bytes, progress_counter};
use crate::backend::{Cache, DecryptReadBackend, FileType, ReadBackend};
use crate::backend::{Cache, DecryptFullBackend, DecryptReadBackend, FileType, ReadBackend};
use crate::blob::{BlobType, NodeType, TreeStreamerOnce};
use crate::commands::helpers::progress_spinner;
use crate::crypto::{hash, CryptoKey};
use crate::id::Id;
use crate::index::{IndexBackend, IndexCollector, IndexType, IndexedBackend};
use crate::repo::{IndexFile, IndexPack, SnapshotFile};
use crate::repo::{
IndexFile, IndexPack, PackHeader, PackHeaderLength, PackHeaderRef, SnapshotFile,
};
#[derive(Parser)]
pub(super) struct Opts {
@ -26,7 +32,7 @@ pub(super) struct Opts {
}
pub(super) async fn execute(
be: &(impl DecryptReadBackend + Unpin),
be: &(impl DecryptFullBackend + Unpin), // TODO: this should be a DecryptReadBackend; see check_pack()
cache: &Option<Cache>,
hot_be: &Option<impl ReadBackend>,
raw_be: &impl ReadBackend,
@ -54,7 +60,7 @@ pub(super) async fn execute(
}
}
let index_collector = check_packs(be, hot_be).await?;
let index_collector = check_packs(be, hot_be, opts.read_data).await?;
if !opts.trust_cache {
if let Some(cache) = &cache {
@ -64,12 +70,33 @@ pub(super) async fn execute(
}
}
let be = IndexBackend::new_from_index(be, index_collector.into_index());
let index_be = IndexBackend::new_from_index(be, index_collector.into_index());
check_snapshots(&be).await?;
check_snapshots(&index_be).await?;
if opts.read_data {
unimplemented!()
let p = progress_counter("reading pack data...");
stream::iter(index_be.into_index().into_iter().map(|pack| {
let be = be.clone();
let p = p.clone();
(pack, be, p)
}))
// TODO: Make concurrency (4) customizable
.for_each_concurrent(4, |(pack, be, p)| async move {
let id = pack.id;
let data = be.read_full(FileType::Pack, &id).await.unwrap();
spawn_blocking(move || {
match check_pack(&be, pack, data) {
Ok(()) => {}
Err(err) => error!("Error reading pack {id} : {err}",),
}
p.inc(1);
})
.await
.unwrap()
})
.await;
p.finish();
}
Ok(())
@ -157,10 +184,15 @@ async fn check_cache_files(
async fn check_packs(
be: &impl DecryptReadBackend,
hot_be: &Option<impl ReadBackend>,
read_data: bool,
) -> Result<IndexCollector> {
let mut packs = HashMap::new();
let mut tree_packs = HashMap::new();
let mut index_collector = IndexCollector::new(IndexType::FullTrees);
let mut index_collector = IndexCollector::new(if read_data {
IndexType::Full
} else {
IndexType::FullTrees
});
let mut process_pack = |p: IndexPack| {
let blob_type = p.blob_type();
@ -289,3 +321,73 @@ async fn check_snapshots(index: &(impl IndexedBackend + Unpin)) -> Result<()> {
Ok(())
}
fn check_pack(
be: &impl DecryptFullBackend, // TODO: this should be a DecryptReadBackend; we just additionally need the key() method
index_pack: IndexPack,
mut data: Bytes,
) -> Result<()> {
let id = index_pack.id;
let size = index_pack.pack_size();
if data.len() != size as usize {
error!(
"pack {id}: data size does not match expected size. Read: {} bytes, expected: {size} bytes",
data.len()
);
return Ok(());
}
let comp_id = hash(&data);
if id != comp_id {
error!("pack {id}: Hash mismatch. Computed hash: {comp_id}");
return Ok(());
}
// check header length
let header_len = PackHeaderRef::from_index_pack(&index_pack).size();
let pack_header_len = PackHeaderLength::from_binary(&data.split_off(data.len() - 4))?.to_u32();
if pack_header_len != header_len {
error!("pack {id}: Header length in pack file doesn't match index. In pack: {pack_header_len}, calculated: {header_len}");
return Ok(());
}
// check header
let header = be
.key()
.decrypt_data(&data.split_off(data.len() - header_len as usize))?;
let pack_blobs = PackHeader::from_binary(&header)?.into_blobs();
let mut blobs = index_pack.blobs;
blobs.sort_unstable_by_key(|b| b.offset);
if pack_blobs != blobs {
error!("pack {id}: Header from pack file does not match the index");
debug!("pack file header: {pack_blobs:?}");
debug!("index: {:?}", blobs);
return Ok(());
}
// check blobs
for blob in blobs {
let blob_id = blob.id;
let mut blob_data = be
.key()
.decrypt_data(&data.split_to(blob.length as usize))?;
// TODO: this is identical to backend/decrypt.rs; unify these two parts!
if let Some(length) = blob.uncompressed_length {
blob_data = decode_all(&*blob_data).unwrap();
if blob_data.len() != length.get() as usize {
error!("pack {id}, blob {blob_id}: Actual uncompressed length does not fit saved uncompressed length");
return Ok(());
}
}
let comp_id = hash(&blob_data);
if blob.id != comp_id {
error!("pack {id}, blob {blob_id}: Hash mismatch. Computed hash: {comp_id}");
return Ok(());
}
}
Ok(())
}

View File

@ -16,7 +16,7 @@ use crate::blob::{BlobType, BlobTypeMap, NodeType, PackSizer, Repacker, TreeStre
use crate::commands::helpers::progress_spinner;
use crate::id::Id;
use crate::index::{IndexBackend, IndexCollector, IndexType, IndexedBackend, Indexer, ReadIndex};
use crate::repo::{ConfigFile, IndexBlob, IndexFile, IndexPack, SnapshotFile};
use crate::repo::{ConfigFile, HeaderEntry, IndexBlob, IndexFile, IndexPack, SnapshotFile};
#[derive(Parser)]
#[clap(global_setting(AppSettings::DeriveDisplayOrder))]
@ -864,10 +864,10 @@ impl Pruner {
// header size.
let tree_size_after_prune = self.stats.size[BlobType::Tree].total_after_prune()
+ self.stats.blobs[BlobType::Tree].total_after_prune()
* IndexPack::HEADER_LEN_COMPRESSED as u64;
* HeaderEntry::ENTRY_LEN_COMPRESSED as u64;
let data_size_after_prune = self.stats.size[BlobType::Data].total_after_prune()
+ self.stats.blobs[BlobType::Data].total_after_prune()
* IndexPack::HEADER_LEN_COMPRESSED as u64;
* HeaderEntry::ENTRY_LEN_COMPRESSED as u64;
let mut tree_repacker = Repacker::new(
be.clone(),

View File

@ -1,6 +1,6 @@
use std::fmt;
use binrw::BinWrite;
use binrw::{BinRead, BinWrite};
use derive_more::{Constructor, Display};
use rand::{thread_rng, RngCore};
use serde::{Deserialize, Serialize};
@ -19,6 +19,7 @@ use thiserror::Error;
Serialize,
Deserialize,
BinWrite,
BinRead,
Display,
)]
#[display(fmt = "{}", "&self.to_hex()[0..8]")]

View File

@ -3,7 +3,7 @@ use std::num::NonZeroU32;
use super::{BlobType, IndexEntry, ReadIndex};
use crate::blob::BlobTypeMap;
use crate::id::Id;
use crate::repo::IndexPack;
use crate::repo::{IndexBlob, IndexPack};
#[derive(Debug, PartialEq, Eq)]
struct SortedEntry {
@ -20,6 +20,7 @@ pub(crate) enum IndexType {
OnlyTrees,
}
#[derive(Debug)]
enum EntriesVariants {
None,
Ids(Vec<Id>),
@ -42,11 +43,20 @@ pub(crate) struct TypeIndexCollector {
#[derive(Default)]
pub(crate) struct IndexCollector(BlobTypeMap<TypeIndexCollector>);
pub struct PackIndexes {
c: Index,
tpe: BlobType,
idx: BlobTypeMap<(usize, usize)>,
}
#[derive(Debug)]
pub(crate) struct TypeIndex {
packs: Vec<Id>,
entries: EntriesVariants,
total_size: u64,
}
#[derive(Debug)]
pub struct Index(BlobTypeMap<TypeIndex>);
impl IndexCollector {
@ -67,7 +77,7 @@ impl IndexCollector {
&self.0[BlobType::Tree].packs
}
// Turns Collector into an index by sorting the entries.
// Turns Collector into an index by sorting the entries by ID.
pub fn into_index(self) -> Index {
Index(self.0.map(|_, mut tc| {
match &mut tc.entries {
@ -125,6 +135,74 @@ impl Extend<IndexPack> for IndexCollector {
}
}
impl Iterator for PackIndexes {
type Item = IndexPack;
fn next(&mut self) -> Option<Self::Item> {
let (pack_idx, idx) = loop {
let (pack_idx, idx) = &mut self.idx[self.tpe];
if *pack_idx >= self.c.0[self.tpe].packs.len() {
if self.tpe == BlobType::Data {
return None;
} else {
self.tpe = BlobType::Data;
}
} else {
break (pack_idx, idx);
}
};
let mut pack = IndexPack::default();
pack.set_id(self.c.0[self.tpe].packs[*pack_idx]);
if let EntriesVariants::FullEntries(entries) = &self.c.0[self.tpe].entries {
while *idx < entries.len() && entries[*idx].pack_idx == *pack_idx {
let entry = &entries[*idx];
pack.blobs.push(IndexBlob {
id: entry.id,
tpe: self.tpe,
offset: entry.offset,
length: entry.length,
uncompressed_length: entry.uncompressed_length,
});
*idx += 1;
}
}
*pack_idx += 1;
Some(pack)
}
}
impl IntoIterator for Index {
type Item = IndexPack;
type IntoIter = PackIndexes;
// Turns Collector into an iterator yielding PackIndex by sorting the entries by pack.
fn into_iter(mut self) -> Self::IntoIter {
for (_, tc) in self.0.iter_mut() {
if let EntriesVariants::FullEntries(entries) = &mut tc.entries {
entries.sort_unstable_by(|e1, e2| e1.pack_idx.cmp(&e2.pack_idx))
}
}
PackIndexes {
c: Index(self.0.map(|_, mut tc| {
if let EntriesVariants::FullEntries(entries) = &mut tc.entries {
entries.sort_unstable_by(|e1, e2| e1.pack_idx.cmp(&e2.pack_idx))
}
TypeIndex {
packs: tc.packs,
entries: tc.entries,
total_size: tc.total_size,
}
})),
tpe: BlobType::Tree,
idx: BlobTypeMap::default(),
}
}
}
impl ReadIndex for Index {
fn get_id(&self, blob_type: &BlobType, id: &Id) -> Option<IndexEntry> {
let vec = match &self.0[*blob_type].entries {

View File

@ -145,6 +145,10 @@ impl<BE: DecryptReadBackend> IndexBackend<BE> {
pub async fn only_full_trees(be: &BE, p: ProgressBar) -> Result<Self> {
Self::new_from_collector(be, p, IndexCollector::new(IndexType::FullTrees)).await
}
pub fn into_index(self) -> Index {
Arc::try_unwrap(self.index).expect("index still in use")
}
}
impl<BE: DecryptReadBackend> IndexedBackend for IndexBackend<BE> {

View File

@ -8,6 +8,8 @@ use crate::backend::{FileType, RepoFile};
use crate::blob::BlobType;
use crate::id::Id;
use super::PackHeaderRef;
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct IndexFile {
#[serde(skip_serializing_if = "Option::is_none")]
@ -42,15 +44,6 @@ pub struct IndexPack {
}
impl IndexPack {
// 4 equals the size of blob::packer::PackHeaderLength
// 32 equals the size of the crypto overhead
pub const PACK_OVERHEAD: u32 = 4 + 32;
// this equals the size of blob::packer::PackHeaderEntry
pub const HEADER_LEN: u32 = 37;
// this equals the size of blob::packer::PackHeaderEntryComp
pub const HEADER_LEN_COMPRESSED: u32 = 41;
pub fn set_id(&mut self, id: Id) {
self.id = id;
}
@ -74,15 +67,8 @@ impl IndexPack {
// calculate the pack size from the contained blobs
pub fn pack_size(&self) -> u32 {
self.size.unwrap_or_else(|| {
self.blobs.iter().fold(Self::PACK_OVERHEAD, |acc, blob| {
acc + blob.length
+ match blob.uncompressed_length {
None => Self::HEADER_LEN,
Some(_) => Self::HEADER_LEN_COMPRESSED,
}
})
})
self.size
.unwrap_or_else(|| PackHeaderRef::from_index_pack(self).pack_size())
}
/// returns the blob type of the pack. Note that only packs with

View File

@ -1,10 +1,12 @@
mod configfile;
mod indexfile;
mod keyfile;
mod packfile;
mod snapshotfile;
pub use super::id::*;
pub use configfile::*;
pub use indexfile::*;
pub use keyfile::*;
pub use packfile::*;
pub use snapshotfile::*;

191
src/repo/packfile.rs Normal file
View File

@ -0,0 +1,191 @@
use std::num::NonZeroU32;
use anyhow::Result;
use binrw::{io::Cursor, BinRead, BinWrite};
use crate::blob::BlobType;
use crate::id::Id;
use super::{IndexBlob, IndexPack};
// 32 equals the size of the crypto overhead
// TODO: use from crypto mod
pub const COMP_OVERHEAD: u32 = 32;
pub const LENGTH_LEN: u32 = 4;
#[derive(BinWrite, BinRead)]
#[brw(little)]
pub struct PackHeaderLength(u32);
impl PackHeaderLength {
pub fn from_u32(len: u32) -> Self {
Self(len)
}
pub fn to_u32(&self) -> u32 {
self.0
}
/// Read pack header length from binary representation
pub fn from_binary(data: &[u8]) -> Result<Self> {
let mut reader = Cursor::new(data);
Ok(PackHeaderLength::read(&mut reader)?)
}
/// generate the binary representation of the pack header length
pub fn to_binary(&self) -> Result<Vec<u8>> {
let mut writer = Cursor::new(Vec::with_capacity(4));
self.write(&mut writer)?;
Ok(writer.into_inner())
}
}
#[derive(BinRead, BinWrite)]
#[brw(little)]
pub enum HeaderEntry {
#[brw(magic(0u8))]
Data { len: u32, id: Id },
#[brw(magic(1u8))]
Tree { len: u32, id: Id },
#[brw(magic(2u8))]
CompData { len: u32, len_data: u32, id: Id },
#[brw(magic(3u8))]
CompTree { len: u32, len_data: u32, id: Id },
}
impl HeaderEntry {
pub const ENTRY_LEN: u32 = 37;
pub const ENTRY_LEN_COMPRESSED: u32 = 41;
fn from_blob(blob: &IndexBlob) -> Self {
match (blob.uncompressed_length, blob.tpe) {
(None, BlobType::Data) => Self::Data {
len: blob.length,
id: blob.id,
},
(None, BlobType::Tree) => Self::Tree {
len: blob.length,
id: blob.id,
},
(Some(len), BlobType::Data) => Self::CompData {
len: blob.length,
len_data: len.get(),
id: blob.id,
},
(Some(len), BlobType::Tree) => Self::CompTree {
len: blob.length,
len_data: len.get(),
id: blob.id,
},
}
}
// the length of this header entry
fn length(&self) -> u32 {
match &self {
Self::Data { len: _, id: _ } => Self::ENTRY_LEN,
Self::Tree { len: _, id: _ } => Self::ENTRY_LEN,
Self::CompData {
len: _,
len_data: _,
id: _,
} => Self::ENTRY_LEN_COMPRESSED,
Self::CompTree {
len: _,
len_data: _,
id: _,
} => Self::ENTRY_LEN_COMPRESSED,
}
}
fn into_blob(self, offset: u32) -> IndexBlob {
match self {
Self::Data { len, id } => IndexBlob {
id,
length: len,
tpe: BlobType::Data,
uncompressed_length: None,
offset,
},
Self::Tree { len, id } => IndexBlob {
id,
length: len,
tpe: BlobType::Tree,
uncompressed_length: None,
offset,
},
Self::CompData { len, id, len_data } => IndexBlob {
id,
length: len,
tpe: BlobType::Data,
uncompressed_length: NonZeroU32::new(len_data),
offset,
},
Self::CompTree { len, id, len_data } => IndexBlob {
id,
length: len,
tpe: BlobType::Tree,
uncompressed_length: NonZeroU32::new(len_data),
offset,
},
}
}
}
pub struct PackHeader(Vec<IndexBlob>);
impl PackHeader {
/// Read the binary representation of the pack header
pub fn from_binary(pack: &[u8]) -> Result<Self> {
let mut reader = Cursor::new(pack);
let mut offset = 0;
let mut blobs = Vec::new();
loop {
let blob = match HeaderEntry::read(&mut reader) {
Ok(entry) => entry.into_blob(offset),
Err(err) if err.is_eof() => break,
Err(err) => return Err(err.into()),
};
offset += blob.length;
blobs.push(blob);
}
Ok(Self(blobs))
}
pub fn into_blobs(self) -> Vec<IndexBlob> {
self.0
}
}
pub struct PackHeaderRef<'a>(&'a [IndexBlob]);
impl<'a> PackHeaderRef<'a> {
pub fn from_index_pack(pack: &'a IndexPack) -> Self {
Self(&pack.blobs)
}
// calculate the pack header size from the contained blobs
pub fn size(&self) -> u32 {
self.0.iter().fold(COMP_OVERHEAD, |acc, blob| {
acc + HeaderEntry::from_blob(blob).length()
})
}
// calculate the pack size from the contained blobs
pub fn pack_size(&self) -> u32 {
self.0.iter().fold(COMP_OVERHEAD + LENGTH_LEN, |acc, blob| {
acc + blob.length + HeaderEntry::from_blob(blob).length()
})
}
/// generate the binary representation of the pack header
pub fn to_binary(&self) -> Result<Vec<u8>> {
let mut writer = Cursor::new(Vec::with_capacity(self.pack_size() as usize));
// collect header entries
for blob in self.0 {
HeaderEntry::from_blob(blob).write(&mut writer)?;
}
Ok(writer.into_inner())
}
}

View File

@ -49,7 +49,7 @@ teardown () {
assert_success
assert_output -p "2 snapshot(s)"
run $RUSTIC check
run $RUSTIC check --read-data
assert_success
}