mirror of
https://github.com/rustic-rs/rustic.git
synced 2025-10-26 11:18:51 +00:00
check: Add check for valid cache files
This commit is contained in:
parent
fadda788c8
commit
caca900972
@ -24,6 +24,10 @@ impl<BE: WriteBackend> CachedBackend<BE> {
|
||||
cache: Cache::new(id, create).unwrap(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn cache(&self) -> &Option<Cache> {
|
||||
&self.cache
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@ -127,7 +131,7 @@ impl<BE: WriteBackend> WriteBackend for CachedBackend<BE> {
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct Cache {
|
||||
pub struct Cache {
|
||||
path: PathBuf,
|
||||
}
|
||||
|
||||
@ -160,7 +164,7 @@ impl Cache {
|
||||
self.path.join(tpe.name()).join(&hex_id[0..2]).join(&hex_id)
|
||||
}
|
||||
|
||||
async fn list_with_size(&self, tpe: FileType) -> Result<HashMap<Id, u32>> {
|
||||
pub async fn list_with_size(&self, tpe: FileType) -> Result<HashMap<Id, u32>> {
|
||||
let path = self.path.join(tpe.name());
|
||||
|
||||
let walker = WalkDir::new(path)
|
||||
@ -192,6 +196,7 @@ impl Cache {
|
||||
// TODO: this function is yet only called from list_with_size. This cleans up
|
||||
// index and snapshot files.
|
||||
// It should also be called when reading the index to clean up pack files.
|
||||
|
||||
pub async fn remove_not_in_list(&self, tpe: FileType, list: &Vec<(Id, u32)>) -> Result<()> {
|
||||
let mut list_cache = self.list_with_size(tpe).await?;
|
||||
// remove present files from the cache list
|
||||
@ -210,7 +215,7 @@ impl Cache {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn read_full(&self, tpe: FileType, id: &Id) -> Result<Vec<u8>> {
|
||||
pub async fn read_full(&self, tpe: FileType, id: &Id) -> Result<Vec<u8>> {
|
||||
v3!("cache reading tpe: {:?}, id: {}", &tpe, &id);
|
||||
let data = fs::read(self.path(tpe, id))?;
|
||||
v3!("cache hit!");
|
||||
|
||||
@ -2,11 +2,12 @@ use std::collections::HashMap;
|
||||
|
||||
use anyhow::Result;
|
||||
use clap::Parser;
|
||||
use futures::TryStreamExt;
|
||||
use futures::{stream::FuturesUnordered, TryStreamExt};
|
||||
use tokio::spawn;
|
||||
use vlog::*;
|
||||
|
||||
use super::progress_counter;
|
||||
use crate::backend::{DecryptReadBackend, FileType};
|
||||
use super::{progress_bytes, progress_counter};
|
||||
use crate::backend::{Cache, DecryptReadBackend, FileType, ReadBackend};
|
||||
use crate::blob::{NodeType, TreeStreamerOnce};
|
||||
use crate::index::{IndexBackend, IndexCollector, IndexType, IndexedBackend};
|
||||
use crate::repo::{IndexFile, IndexPack, SnapshotFile};
|
||||
@ -18,10 +19,33 @@ pub(super) struct Opts {
|
||||
read_data: bool,
|
||||
}
|
||||
|
||||
pub(super) async fn execute(be: &(impl DecryptReadBackend + Unpin), opts: Opts) -> Result<()> {
|
||||
pub(super) async fn execute(
|
||||
be: &(impl DecryptReadBackend + Unpin),
|
||||
cache: &Option<Cache>,
|
||||
raw_be: &impl ReadBackend,
|
||||
opts: Opts,
|
||||
) -> Result<()> {
|
||||
if let Some(cache) = &cache {
|
||||
v1!("checking snapshots and index in cache...");
|
||||
for file_type in [FileType::Snapshot, FileType::Index] {
|
||||
// list files in order to clean up the cache
|
||||
//
|
||||
// This lists files here and later when reading index / checking snapshots
|
||||
// TODO: Only list the files once...
|
||||
let _ = be.list_with_size(file_type).await?;
|
||||
|
||||
check_cache_files(cache, raw_be, file_type).await?;
|
||||
}
|
||||
}
|
||||
|
||||
v1!("checking packs in index and from pack list...");
|
||||
let index_collector = check_packs(be).await?;
|
||||
|
||||
if let Some(cache) = &cache {
|
||||
v1!("checking packs in cache...");
|
||||
check_cache_files(cache, raw_be, FileType::Pack).await?;
|
||||
}
|
||||
|
||||
let be = IndexBackend::new_from_index(be, index_collector.into_index());
|
||||
|
||||
v1!("checking snapshots and trees...");
|
||||
@ -34,6 +58,49 @@ pub(super) async fn execute(be: &(impl DecryptReadBackend + Unpin), opts: Opts)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn check_cache_files(
|
||||
cache: &Cache,
|
||||
be: &impl ReadBackend,
|
||||
file_type: FileType,
|
||||
) -> Result<()> {
|
||||
let files = cache.list_with_size(file_type).await?;
|
||||
|
||||
if files.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let p = progress_bytes();
|
||||
let total_size = files.iter().map(|(_, size)| *size as u64).sum();
|
||||
p.set_length(total_size);
|
||||
|
||||
let stream: FuturesUnordered<_> = files
|
||||
.into_iter()
|
||||
.map(|(id, size)| {
|
||||
let cache = cache.clone();
|
||||
let be = be.clone();
|
||||
let p = p.clone();
|
||||
spawn(async move {
|
||||
// Read file from cache and from backend and compare
|
||||
// TODO: Use (Async)Readers and compare using them!
|
||||
let data_cached = cache.read_full(file_type, &id).await.unwrap();
|
||||
let data = be.read_full(file_type, &id).await.unwrap();
|
||||
if data_cached != data {
|
||||
eprintln!(
|
||||
"Cached file Type: {:?}, Id: {} is not identical to backend!",
|
||||
file_type, id
|
||||
);
|
||||
}
|
||||
p.inc(size as u64);
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
|
||||
stream.try_collect().await?;
|
||||
|
||||
p.finish();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// check if packs correspond to index
|
||||
async fn check_packs(be: &impl DecryptReadBackend) -> Result<IndexCollector> {
|
||||
let mut packs = HashMap::new();
|
||||
|
||||
@ -114,16 +114,17 @@ pub async fn execute() -> Result<()> {
|
||||
|
||||
let config_ids = be.list(FileType::Config).await?;
|
||||
|
||||
let (cmd, key, dbe, config) = match (args.command, config_ids.len()) {
|
||||
let (cmd, key, dbe, cache, be, config) = match (args.command, config_ids.len()) {
|
||||
(Command::Init(opts), 0) => return init::execute(&be, opts).await,
|
||||
(Command::Init(_), _) => bail!("Config file already exists. Aborting."),
|
||||
(cmd, 1) => {
|
||||
let key = get_key(&be, args.password_file).await?;
|
||||
let dbe = DecryptBackend::new(&be, key.clone());
|
||||
let config: ConfigFile = dbe.get_file(&config_ids[0]).await?;
|
||||
let be = CachedBackend::new(be, config.id, !args.no_cache);
|
||||
let dbe = DecryptBackend::new(&be, key.clone());
|
||||
(cmd, key, dbe, config)
|
||||
let be_cached = CachedBackend::new(be.clone(), config.id, !args.no_cache);
|
||||
let cache = be_cached.cache().clone();
|
||||
let dbe = DecryptBackend::new(&be_cached, key.clone());
|
||||
(cmd, key, dbe, cache, be, config)
|
||||
}
|
||||
(_, 0) => bail!("No config file found. Is there a repo?"),
|
||||
_ => bail!("More than one config file. Aborting."),
|
||||
@ -132,7 +133,7 @@ pub async fn execute() -> Result<()> {
|
||||
match cmd {
|
||||
Command::Backup(opts) => backup::execute(&dbe, opts, config, command).await?,
|
||||
Command::Cat(opts) => cat::execute(&dbe, opts).await?,
|
||||
Command::Check(opts) => check::execute(&dbe, opts).await?,
|
||||
Command::Check(opts) => check::execute(&dbe, &cache, &be, opts).await?,
|
||||
Command::Diff(opts) => diff::execute(&dbe, opts).await?,
|
||||
Command::Forget(opts) => forget::execute(&dbe, opts, config).await?,
|
||||
Command::Init(_) => {} // already handled above
|
||||
|
||||
Loading…
Reference in New Issue
Block a user