From c3c58d761684cb6482a50412131ec4b128c4994e Mon Sep 17 00:00:00 2001 From: Alexander Weiss Date: Sun, 2 Oct 2022 20:36:44 +0200 Subject: [PATCH] check: Add --read-data --- src/blob/packer.rs | 61 ++---------- src/commands/check.rs | 118 +++++++++++++++++++++-- src/commands/prune.rs | 6 +- src/id.rs | 3 +- src/index/binarysorted.rs | 82 +++++++++++++++- src/index/mod.rs | 4 + src/repo/indexfile.rs | 22 +---- src/repo/mod.rs | 2 + src/repo/packfile.rs | 191 ++++++++++++++++++++++++++++++++++++++ test-bats/simple.bats | 2 +- 10 files changed, 404 insertions(+), 87 deletions(-) create mode 100644 src/repo/packfile.rs diff --git a/src/blob/packer.rs b/src/blob/packer.rs index a74bda1..311aac8 100644 --- a/src/blob/packer.rs +++ b/src/blob/packer.rs @@ -3,7 +3,6 @@ use std::num::NonZeroU32; use std::time::{Duration, SystemTime}; use anyhow::{anyhow, Result}; -use binrw::{io::Cursor, BinWrite}; use bytes::{Bytes, BytesMut}; use chrono::Local; use tokio::{spawn, task::JoinHandle}; @@ -14,7 +13,7 @@ use crate::backend::{DecryptFullBackend, DecryptWriteBackend, FileType}; use crate::crypto::{CryptoKey, Hasher}; use crate::id::Id; use crate::index::SharedIndexer; -use crate::repo::{ConfigFile, IndexBlob, IndexPack}; +use crate::repo::{ConfigFile, IndexBlob, IndexPack, PackHeaderLength, PackHeaderRef}; const KB: u32 = 1024; const MB: u32 = 1024 * KB; @@ -202,68 +201,22 @@ impl Packer { /// writes header and length of header to packfile pub async fn write_header(&mut self) -> Result<()> { - #[derive(BinWrite)] - #[bw(little)] - struct PackHeaderLength(u32); - - #[derive(BinWrite)] - #[bw(little)] - struct PackHeaderEntry { - tpe: u8, - len: u32, - id: Id, - } - - #[derive(BinWrite)] - #[bw(little)] - struct PackHeaderEntryComp { - tpe: u8, - len: u32, - len_data: u32, - id: Id, - } - - // collect header entries - let mut writer = Cursor::new(Vec::new()); - for blob in &self.index.blobs { - match blob.uncompressed_length { - None => PackHeaderEntry { - tpe: match blob.tpe { - BlobType::Data => 0b00, - BlobType::Tree => 0b01, - }, - len: blob.length, - id: blob.id, - } - .write(&mut writer)?, - Some(len) => PackHeaderEntryComp { - tpe: match blob.tpe { - BlobType::Data => 0b10, - BlobType::Tree => 0b11, - }, - len: blob.length, - len_data: len.get(), - id: blob.id, - } - .write(&mut writer)?, - }; - } + // comput the pack header + let data = PackHeaderRef::from_index_pack(&self.index).to_binary()?; // encrypt and write to pack file - let data = writer.into_inner(); let data = self .be .key() .encrypt_data(&data) .map_err(|_| anyhow!("crypto error"))?; - let headerlen = data.len(); + + let headerlen = data.len().try_into()?; self.write_data(&data).await?; // finally write length of header unencrypted to pack file - let mut writer = Cursor::new(Vec::new()); - PackHeaderLength(headerlen.try_into()?).write(&mut writer)?; - let data = writer.into_inner(); - self.write_data(&data).await?; + self.write_data(&PackHeaderLength::from_u32(headerlen).to_binary()?) + .await?; Ok(()) } diff --git a/src/commands/check.rs b/src/commands/check.rs index 7dcdaa3..e66148a 100644 --- a/src/commands/check.rs +++ b/src/commands/check.rs @@ -1,18 +1,24 @@ use std::collections::HashMap; use anyhow::Result; +use bytes::Bytes; use clap::Parser; use futures::{stream, StreamExt, TryStreamExt}; use indicatif::ProgressBar; use log::*; +use tokio::task::spawn_blocking; +use zstd::stream::decode_all; use super::{progress_bytes, progress_counter}; -use crate::backend::{Cache, DecryptReadBackend, FileType, ReadBackend}; +use crate::backend::{Cache, DecryptFullBackend, DecryptReadBackend, FileType, ReadBackend}; use crate::blob::{BlobType, NodeType, TreeStreamerOnce}; use crate::commands::helpers::progress_spinner; +use crate::crypto::{hash, CryptoKey}; use crate::id::Id; use crate::index::{IndexBackend, IndexCollector, IndexType, IndexedBackend}; -use crate::repo::{IndexFile, IndexPack, SnapshotFile}; +use crate::repo::{ + IndexFile, IndexPack, PackHeader, PackHeaderLength, PackHeaderRef, SnapshotFile, +}; #[derive(Parser)] pub(super) struct Opts { @@ -26,7 +32,7 @@ pub(super) struct Opts { } pub(super) async fn execute( - be: &(impl DecryptReadBackend + Unpin), + be: &(impl DecryptFullBackend + Unpin), // TODO: this should be a DecryptReadBackend; see check_pack() cache: &Option, hot_be: &Option, raw_be: &impl ReadBackend, @@ -54,7 +60,7 @@ pub(super) async fn execute( } } - let index_collector = check_packs(be, hot_be).await?; + let index_collector = check_packs(be, hot_be, opts.read_data).await?; if !opts.trust_cache { if let Some(cache) = &cache { @@ -64,12 +70,33 @@ pub(super) async fn execute( } } - let be = IndexBackend::new_from_index(be, index_collector.into_index()); + let index_be = IndexBackend::new_from_index(be, index_collector.into_index()); - check_snapshots(&be).await?; + check_snapshots(&index_be).await?; if opts.read_data { - unimplemented!() + let p = progress_counter("reading pack data..."); + stream::iter(index_be.into_index().into_iter().map(|pack| { + let be = be.clone(); + let p = p.clone(); + (pack, be, p) + })) + // TODO: Make concurrency (4) customizable + .for_each_concurrent(4, |(pack, be, p)| async move { + let id = pack.id; + let data = be.read_full(FileType::Pack, &id).await.unwrap(); + spawn_blocking(move || { + match check_pack(&be, pack, data) { + Ok(()) => {} + Err(err) => error!("Error reading pack {id} : {err}",), + } + p.inc(1); + }) + .await + .unwrap() + }) + .await; + p.finish(); } Ok(()) @@ -157,10 +184,15 @@ async fn check_cache_files( async fn check_packs( be: &impl DecryptReadBackend, hot_be: &Option, + read_data: bool, ) -> Result { let mut packs = HashMap::new(); let mut tree_packs = HashMap::new(); - let mut index_collector = IndexCollector::new(IndexType::FullTrees); + let mut index_collector = IndexCollector::new(if read_data { + IndexType::Full + } else { + IndexType::FullTrees + }); let mut process_pack = |p: IndexPack| { let blob_type = p.blob_type(); @@ -289,3 +321,73 @@ async fn check_snapshots(index: &(impl IndexedBackend + Unpin)) -> Result<()> { Ok(()) } + +fn check_pack( + be: &impl DecryptFullBackend, // TODO: this should be a DecryptReadBackend; we just additionally need the key() method + index_pack: IndexPack, + mut data: Bytes, +) -> Result<()> { + let id = index_pack.id; + let size = index_pack.pack_size(); + if data.len() != size as usize { + error!( + "pack {id}: data size does not match expected size. Read: {} bytes, expected: {size} bytes", + data.len() + ); + return Ok(()); + } + + let comp_id = hash(&data); + if id != comp_id { + error!("pack {id}: Hash mismatch. Computed hash: {comp_id}"); + return Ok(()); + } + + // check header length + let header_len = PackHeaderRef::from_index_pack(&index_pack).size(); + let pack_header_len = PackHeaderLength::from_binary(&data.split_off(data.len() - 4))?.to_u32(); + if pack_header_len != header_len { + error!("pack {id}: Header length in pack file doesn't match index. In pack: {pack_header_len}, calculated: {header_len}"); + return Ok(()); + } + + // check header + let header = be + .key() + .decrypt_data(&data.split_off(data.len() - header_len as usize))?; + + let pack_blobs = PackHeader::from_binary(&header)?.into_blobs(); + let mut blobs = index_pack.blobs; + blobs.sort_unstable_by_key(|b| b.offset); + if pack_blobs != blobs { + error!("pack {id}: Header from pack file does not match the index"); + debug!("pack file header: {pack_blobs:?}"); + debug!("index: {:?}", blobs); + return Ok(()); + } + + // check blobs + for blob in blobs { + let blob_id = blob.id; + let mut blob_data = be + .key() + .decrypt_data(&data.split_to(blob.length as usize))?; + + // TODO: this is identical to backend/decrypt.rs; unify these two parts! + if let Some(length) = blob.uncompressed_length { + blob_data = decode_all(&*blob_data).unwrap(); + if blob_data.len() != length.get() as usize { + error!("pack {id}, blob {blob_id}: Actual uncompressed length does not fit saved uncompressed length"); + return Ok(()); + } + } + + let comp_id = hash(&blob_data); + if blob.id != comp_id { + error!("pack {id}, blob {blob_id}: Hash mismatch. Computed hash: {comp_id}"); + return Ok(()); + } + } + + Ok(()) +} diff --git a/src/commands/prune.rs b/src/commands/prune.rs index 16fc386..1315f81 100644 --- a/src/commands/prune.rs +++ b/src/commands/prune.rs @@ -16,7 +16,7 @@ use crate::blob::{BlobType, BlobTypeMap, NodeType, PackSizer, Repacker, TreeStre use crate::commands::helpers::progress_spinner; use crate::id::Id; use crate::index::{IndexBackend, IndexCollector, IndexType, IndexedBackend, Indexer, ReadIndex}; -use crate::repo::{ConfigFile, IndexBlob, IndexFile, IndexPack, SnapshotFile}; +use crate::repo::{ConfigFile, HeaderEntry, IndexBlob, IndexFile, IndexPack, SnapshotFile}; #[derive(Parser)] #[clap(global_setting(AppSettings::DeriveDisplayOrder))] @@ -864,10 +864,10 @@ impl Pruner { // header size. let tree_size_after_prune = self.stats.size[BlobType::Tree].total_after_prune() + self.stats.blobs[BlobType::Tree].total_after_prune() - * IndexPack::HEADER_LEN_COMPRESSED as u64; + * HeaderEntry::ENTRY_LEN_COMPRESSED as u64; let data_size_after_prune = self.stats.size[BlobType::Data].total_after_prune() + self.stats.blobs[BlobType::Data].total_after_prune() - * IndexPack::HEADER_LEN_COMPRESSED as u64; + * HeaderEntry::ENTRY_LEN_COMPRESSED as u64; let mut tree_repacker = Repacker::new( be.clone(), diff --git a/src/id.rs b/src/id.rs index 4a1efa5..6f08d5f 100644 --- a/src/id.rs +++ b/src/id.rs @@ -1,6 +1,6 @@ use std::fmt; -use binrw::BinWrite; +use binrw::{BinRead, BinWrite}; use derive_more::{Constructor, Display}; use rand::{thread_rng, RngCore}; use serde::{Deserialize, Serialize}; @@ -19,6 +19,7 @@ use thiserror::Error; Serialize, Deserialize, BinWrite, + BinRead, Display, )] #[display(fmt = "{}", "&self.to_hex()[0..8]")] diff --git a/src/index/binarysorted.rs b/src/index/binarysorted.rs index 061b355..7ce978a 100644 --- a/src/index/binarysorted.rs +++ b/src/index/binarysorted.rs @@ -3,7 +3,7 @@ use std::num::NonZeroU32; use super::{BlobType, IndexEntry, ReadIndex}; use crate::blob::BlobTypeMap; use crate::id::Id; -use crate::repo::IndexPack; +use crate::repo::{IndexBlob, IndexPack}; #[derive(Debug, PartialEq, Eq)] struct SortedEntry { @@ -20,6 +20,7 @@ pub(crate) enum IndexType { OnlyTrees, } +#[derive(Debug)] enum EntriesVariants { None, Ids(Vec), @@ -42,11 +43,20 @@ pub(crate) struct TypeIndexCollector { #[derive(Default)] pub(crate) struct IndexCollector(BlobTypeMap); +pub struct PackIndexes { + c: Index, + tpe: BlobType, + idx: BlobTypeMap<(usize, usize)>, +} + +#[derive(Debug)] pub(crate) struct TypeIndex { packs: Vec, entries: EntriesVariants, total_size: u64, } + +#[derive(Debug)] pub struct Index(BlobTypeMap); impl IndexCollector { @@ -67,7 +77,7 @@ impl IndexCollector { &self.0[BlobType::Tree].packs } - // Turns Collector into an index by sorting the entries. + // Turns Collector into an index by sorting the entries by ID. pub fn into_index(self) -> Index { Index(self.0.map(|_, mut tc| { match &mut tc.entries { @@ -125,6 +135,74 @@ impl Extend for IndexCollector { } } +impl Iterator for PackIndexes { + type Item = IndexPack; + + fn next(&mut self) -> Option { + let (pack_idx, idx) = loop { + let (pack_idx, idx) = &mut self.idx[self.tpe]; + if *pack_idx >= self.c.0[self.tpe].packs.len() { + if self.tpe == BlobType::Data { + return None; + } else { + self.tpe = BlobType::Data; + } + } else { + break (pack_idx, idx); + } + }; + + let mut pack = IndexPack::default(); + pack.set_id(self.c.0[self.tpe].packs[*pack_idx]); + + if let EntriesVariants::FullEntries(entries) = &self.c.0[self.tpe].entries { + while *idx < entries.len() && entries[*idx].pack_idx == *pack_idx { + let entry = &entries[*idx]; + pack.blobs.push(IndexBlob { + id: entry.id, + tpe: self.tpe, + offset: entry.offset, + length: entry.length, + uncompressed_length: entry.uncompressed_length, + }); + *idx += 1; + } + } + *pack_idx += 1; + + Some(pack) + } +} + +impl IntoIterator for Index { + type Item = IndexPack; + type IntoIter = PackIndexes; + + // Turns Collector into an iterator yielding PackIndex by sorting the entries by pack. + fn into_iter(mut self) -> Self::IntoIter { + for (_, tc) in self.0.iter_mut() { + if let EntriesVariants::FullEntries(entries) = &mut tc.entries { + entries.sort_unstable_by(|e1, e2| e1.pack_idx.cmp(&e2.pack_idx)) + } + } + PackIndexes { + c: Index(self.0.map(|_, mut tc| { + if let EntriesVariants::FullEntries(entries) = &mut tc.entries { + entries.sort_unstable_by(|e1, e2| e1.pack_idx.cmp(&e2.pack_idx)) + } + + TypeIndex { + packs: tc.packs, + entries: tc.entries, + total_size: tc.total_size, + } + })), + tpe: BlobType::Tree, + idx: BlobTypeMap::default(), + } + } +} + impl ReadIndex for Index { fn get_id(&self, blob_type: &BlobType, id: &Id) -> Option { let vec = match &self.0[*blob_type].entries { diff --git a/src/index/mod.rs b/src/index/mod.rs index a1308e1..5673a43 100644 --- a/src/index/mod.rs +++ b/src/index/mod.rs @@ -145,6 +145,10 @@ impl IndexBackend { pub async fn only_full_trees(be: &BE, p: ProgressBar) -> Result { Self::new_from_collector(be, p, IndexCollector::new(IndexType::FullTrees)).await } + + pub fn into_index(self) -> Index { + Arc::try_unwrap(self.index).expect("index still in use") + } } impl IndexedBackend for IndexBackend { diff --git a/src/repo/indexfile.rs b/src/repo/indexfile.rs index bbeb708..3e576e5 100644 --- a/src/repo/indexfile.rs +++ b/src/repo/indexfile.rs @@ -8,6 +8,8 @@ use crate::backend::{FileType, RepoFile}; use crate::blob::BlobType; use crate::id::Id; +use super::PackHeaderRef; + #[derive(Debug, Default, Serialize, Deserialize)] pub struct IndexFile { #[serde(skip_serializing_if = "Option::is_none")] @@ -42,15 +44,6 @@ pub struct IndexPack { } impl IndexPack { - // 4 equals the size of blob::packer::PackHeaderLength - // 32 equals the size of the crypto overhead - pub const PACK_OVERHEAD: u32 = 4 + 32; - - // this equals the size of blob::packer::PackHeaderEntry - pub const HEADER_LEN: u32 = 37; - // this equals the size of blob::packer::PackHeaderEntryComp - pub const HEADER_LEN_COMPRESSED: u32 = 41; - pub fn set_id(&mut self, id: Id) { self.id = id; } @@ -74,15 +67,8 @@ impl IndexPack { // calculate the pack size from the contained blobs pub fn pack_size(&self) -> u32 { - self.size.unwrap_or_else(|| { - self.blobs.iter().fold(Self::PACK_OVERHEAD, |acc, blob| { - acc + blob.length - + match blob.uncompressed_length { - None => Self::HEADER_LEN, - Some(_) => Self::HEADER_LEN_COMPRESSED, - } - }) - }) + self.size + .unwrap_or_else(|| PackHeaderRef::from_index_pack(self).pack_size()) } /// returns the blob type of the pack. Note that only packs with diff --git a/src/repo/mod.rs b/src/repo/mod.rs index ae445af..ec07538 100644 --- a/src/repo/mod.rs +++ b/src/repo/mod.rs @@ -1,10 +1,12 @@ mod configfile; mod indexfile; mod keyfile; +mod packfile; mod snapshotfile; pub use super::id::*; pub use configfile::*; pub use indexfile::*; pub use keyfile::*; +pub use packfile::*; pub use snapshotfile::*; diff --git a/src/repo/packfile.rs b/src/repo/packfile.rs new file mode 100644 index 0000000..64a42d9 --- /dev/null +++ b/src/repo/packfile.rs @@ -0,0 +1,191 @@ +use std::num::NonZeroU32; + +use anyhow::Result; +use binrw::{io::Cursor, BinRead, BinWrite}; + +use crate::blob::BlobType; +use crate::id::Id; + +use super::{IndexBlob, IndexPack}; + +// 32 equals the size of the crypto overhead +// TODO: use from crypto mod +pub const COMP_OVERHEAD: u32 = 32; + +pub const LENGTH_LEN: u32 = 4; +#[derive(BinWrite, BinRead)] +#[brw(little)] +pub struct PackHeaderLength(u32); + +impl PackHeaderLength { + pub fn from_u32(len: u32) -> Self { + Self(len) + } + + pub fn to_u32(&self) -> u32 { + self.0 + } + + /// Read pack header length from binary representation + pub fn from_binary(data: &[u8]) -> Result { + let mut reader = Cursor::new(data); + Ok(PackHeaderLength::read(&mut reader)?) + } + + /// generate the binary representation of the pack header length + pub fn to_binary(&self) -> Result> { + let mut writer = Cursor::new(Vec::with_capacity(4)); + self.write(&mut writer)?; + Ok(writer.into_inner()) + } +} + +#[derive(BinRead, BinWrite)] +#[brw(little)] +pub enum HeaderEntry { + #[brw(magic(0u8))] + Data { len: u32, id: Id }, + + #[brw(magic(1u8))] + Tree { len: u32, id: Id }, + + #[brw(magic(2u8))] + CompData { len: u32, len_data: u32, id: Id }, + + #[brw(magic(3u8))] + CompTree { len: u32, len_data: u32, id: Id }, +} + +impl HeaderEntry { + pub const ENTRY_LEN: u32 = 37; + pub const ENTRY_LEN_COMPRESSED: u32 = 41; + + fn from_blob(blob: &IndexBlob) -> Self { + match (blob.uncompressed_length, blob.tpe) { + (None, BlobType::Data) => Self::Data { + len: blob.length, + id: blob.id, + }, + (None, BlobType::Tree) => Self::Tree { + len: blob.length, + id: blob.id, + }, + (Some(len), BlobType::Data) => Self::CompData { + len: blob.length, + len_data: len.get(), + id: blob.id, + }, + (Some(len), BlobType::Tree) => Self::CompTree { + len: blob.length, + len_data: len.get(), + id: blob.id, + }, + } + } + + // the length of this header entry + fn length(&self) -> u32 { + match &self { + Self::Data { len: _, id: _ } => Self::ENTRY_LEN, + Self::Tree { len: _, id: _ } => Self::ENTRY_LEN, + Self::CompData { + len: _, + len_data: _, + id: _, + } => Self::ENTRY_LEN_COMPRESSED, + Self::CompTree { + len: _, + len_data: _, + id: _, + } => Self::ENTRY_LEN_COMPRESSED, + } + } + + fn into_blob(self, offset: u32) -> IndexBlob { + match self { + Self::Data { len, id } => IndexBlob { + id, + length: len, + tpe: BlobType::Data, + uncompressed_length: None, + offset, + }, + Self::Tree { len, id } => IndexBlob { + id, + length: len, + tpe: BlobType::Tree, + uncompressed_length: None, + offset, + }, + Self::CompData { len, id, len_data } => IndexBlob { + id, + length: len, + tpe: BlobType::Data, + uncompressed_length: NonZeroU32::new(len_data), + offset, + }, + Self::CompTree { len, id, len_data } => IndexBlob { + id, + length: len, + tpe: BlobType::Tree, + uncompressed_length: NonZeroU32::new(len_data), + offset, + }, + } + } +} + +pub struct PackHeader(Vec); +impl PackHeader { + /// Read the binary representation of the pack header + pub fn from_binary(pack: &[u8]) -> Result { + let mut reader = Cursor::new(pack); + let mut offset = 0; + let mut blobs = Vec::new(); + loop { + let blob = match HeaderEntry::read(&mut reader) { + Ok(entry) => entry.into_blob(offset), + Err(err) if err.is_eof() => break, + Err(err) => return Err(err.into()), + }; + offset += blob.length; + blobs.push(blob); + } + Ok(Self(blobs)) + } + + pub fn into_blobs(self) -> Vec { + self.0 + } +} + +pub struct PackHeaderRef<'a>(&'a [IndexBlob]); +impl<'a> PackHeaderRef<'a> { + pub fn from_index_pack(pack: &'a IndexPack) -> Self { + Self(&pack.blobs) + } + + // calculate the pack header size from the contained blobs + pub fn size(&self) -> u32 { + self.0.iter().fold(COMP_OVERHEAD, |acc, blob| { + acc + HeaderEntry::from_blob(blob).length() + }) + } + + // calculate the pack size from the contained blobs + pub fn pack_size(&self) -> u32 { + self.0.iter().fold(COMP_OVERHEAD + LENGTH_LEN, |acc, blob| { + acc + blob.length + HeaderEntry::from_blob(blob).length() + }) + } + + /// generate the binary representation of the pack header + pub fn to_binary(&self) -> Result> { + let mut writer = Cursor::new(Vec::with_capacity(self.pack_size() as usize)); + // collect header entries + for blob in self.0 { + HeaderEntry::from_blob(blob).write(&mut writer)?; + } + Ok(writer.into_inner()) + } +} diff --git a/test-bats/simple.bats b/test-bats/simple.bats index 6ed6e9a..2d13eb8 100644 --- a/test-bats/simple.bats +++ b/test-bats/simple.bats @@ -49,7 +49,7 @@ teardown () { assert_success assert_output -p "2 snapshot(s)" - run $RUSTIC check + run $RUSTIC check --read-data assert_success }