diff --git a/src/backend/decrypt.rs b/src/backend/decrypt.rs index 7ee367b..3d13ddd 100644 --- a/src/backend/decrypt.rs +++ b/src/backend/decrypt.rs @@ -16,7 +16,18 @@ impl DecryptFullBackend for T {} #[async_trait] pub trait DecryptReadBackend: ReadBackend { - async fn read_encrypted_full(&self, tpe: FileType, id: &Id) -> Result; + fn decrypt(&self, data: &[u8]) -> Result>; + + async fn read_encrypted_full(&self, tpe: FileType, id: &Id) -> Result { + let decrypted = self.decrypt(&self.read_full(tpe, id).await?)?; + Ok(match decrypted[0] { + b'{' | b'[' => decrypted, // not compressed + 2 => decode_all(&decrypted[1..])?, // 2 indicates compressed data following + _ => bail!("not supported"), + } + .into()) + } + async fn read_encrypted_partial( &self, tpe: FileType, @@ -25,7 +36,20 @@ pub trait DecryptReadBackend: ReadBackend { offset: u32, length: u32, uncompressed_length: Option, - ) -> Result; + ) -> Result { + let mut data = self.decrypt( + &self + .read_partial(tpe, id, cacheable, offset, length) + .await?, + )?; + if let Some(length) = uncompressed_length { + data = decode_all(&*data).unwrap(); + if data.len() != length.get() as usize { + bail!("length of uncompressed data does not match!"); + } + } + Ok(data.into()) + } async fn get_file(&self, id: &Id) -> Result { let data = self.read_encrypted_full(F::TYPE, id).await?; @@ -160,40 +184,8 @@ impl DecryptWriteBackend for DecryptBackend #[async_trait] impl DecryptReadBackend for DecryptBackend { - async fn read_encrypted_full(&self, tpe: FileType, id: &Id) -> Result { - let decrypted = self - .key - .decrypt_data(&self.backend.read_full(tpe, id).await?)?; - Ok(match decrypted[0] { - b'{' | b'[' => decrypted, // not compressed - 2 => decode_all(&decrypted[1..])?, // 2 indicates compressed data following - _ => bail!("not supported"), - } - .into()) - } - - async fn read_encrypted_partial( - &self, - tpe: FileType, - id: &Id, - cacheable: bool, - offset: u32, - length: u32, - uncompressed_length: Option, - ) -> Result { - let mut data = self.key.decrypt_data( - &self - .backend - .read_partial(tpe, id, cacheable, offset, length) - .await?, - )?; - if let Some(length) = uncompressed_length { - data = decode_all(&*data).unwrap(); - if data.len() != length.get() as usize { - bail!("length of uncompressed data does not match!"); - } - } - Ok(data.into()) + fn decrypt(&self, data: &[u8]) -> Result> { + Ok(self.key.decrypt_data(data)?) } } diff --git a/src/backend/dry_run.rs b/src/backend/dry_run.rs index d18fd22..4d57cab 100644 --- a/src/backend/dry_run.rs +++ b/src/backend/dry_run.rs @@ -1,5 +1,3 @@ -use std::num::NonZeroU32; - use anyhow::Result; use async_trait::async_trait; use bytes::Bytes; @@ -23,21 +21,8 @@ impl DryRunBackend { #[async_trait] impl DecryptReadBackend for DryRunBackend { - async fn read_encrypted_full(&self, tpe: FileType, id: &Id) -> Result { - self.be.read_encrypted_full(tpe, id).await - } - async fn read_encrypted_partial( - &self, - tpe: FileType, - id: &Id, - cacheable: bool, - offset: u32, - length: u32, - uncompressed_length: Option, - ) -> Result { - self.be - .read_encrypted_partial(tpe, id, cacheable, offset, length, uncompressed_length) - .await + fn decrypt(&self, data: &[u8]) -> Result> { + self.be.decrypt(data) } } diff --git a/src/commands/check.rs b/src/commands/check.rs index e66148a..f6b5a3c 100644 --- a/src/commands/check.rs +++ b/src/commands/check.rs @@ -10,10 +10,10 @@ use tokio::task::spawn_blocking; use zstd::stream::decode_all; use super::{progress_bytes, progress_counter}; -use crate::backend::{Cache, DecryptFullBackend, DecryptReadBackend, FileType, ReadBackend}; +use crate::backend::{Cache, DecryptReadBackend, FileType, ReadBackend}; use crate::blob::{BlobType, NodeType, TreeStreamerOnce}; use crate::commands::helpers::progress_spinner; -use crate::crypto::{hash, CryptoKey}; +use crate::crypto::hash; use crate::id::Id; use crate::index::{IndexBackend, IndexCollector, IndexType, IndexedBackend}; use crate::repo::{ @@ -32,7 +32,7 @@ pub(super) struct Opts { } pub(super) async fn execute( - be: &(impl DecryptFullBackend + Unpin), // TODO: this should be a DecryptReadBackend; see check_pack() + be: &(impl DecryptReadBackend + Unpin), cache: &Option, hot_be: &Option, raw_be: &impl ReadBackend, @@ -254,16 +254,16 @@ async fn check_packs( async fn check_packs_list(be: &impl ReadBackend, mut packs: HashMap) -> Result<()> { for (id, size) in be.list_with_size(FileType::Pack).await? { match packs.remove(&id) { - None => warn!("pack {id} not referenced in index"), + None => warn!("pack {id} not referenced in index. Can be a parallel backup job. To repair: 'rustic repair index'."), Some(index_size) if index_size != size => { - error!("pack {id}: size computed by index: {index_size}, actual size: {size}",) + error!("pack {id}: size computed by index: {index_size}, actual size: {size}. To repair: 'rustic repair index'.") } _ => {} //everything ok } } for (id, _) in packs { - error!("pack {id} is referenced by the index but not present!",); + error!("pack {id} is referenced by the index but not present! To repair: 'rustic repair index'.",); } Ok(()) } @@ -294,7 +294,7 @@ async fn check_snapshots(index: &(impl IndexedBackend + Unpin)) -> Result<()> { if !index.has_data(id) { error!( - "file {:?} blob {} is missig in index", + "file {:?} blob {} is missing in index", path.join(node.name()), id ); @@ -322,11 +322,7 @@ async fn check_snapshots(index: &(impl IndexedBackend + Unpin)) -> Result<()> { Ok(()) } -fn check_pack( - be: &impl DecryptFullBackend, // TODO: this should be a DecryptReadBackend; we just additionally need the key() method - index_pack: IndexPack, - mut data: Bytes, -) -> Result<()> { +fn check_pack(be: &impl DecryptReadBackend, index_pack: IndexPack, mut data: Bytes) -> Result<()> { let id = index_pack.id; let size = index_pack.pack_size(); if data.len() != size as usize { @@ -352,9 +348,7 @@ fn check_pack( } // check header - let header = be - .key() - .decrypt_data(&data.split_off(data.len() - header_len as usize))?; + let header = be.decrypt(&data.split_off(data.len() - header_len as usize))?; let pack_blobs = PackHeader::from_binary(&header)?.into_blobs(); let mut blobs = index_pack.blobs; @@ -369,9 +363,7 @@ fn check_pack( // check blobs for blob in blobs { let blob_id = blob.id; - let mut blob_data = be - .key() - .decrypt_data(&data.split_to(blob.length as usize))?; + let mut blob_data = be.decrypt(&data.split_to(blob.length as usize))?; // TODO: this is identical to backend/decrypt.rs; unify these two parts! if let Some(length) = blob.uncompressed_length { diff --git a/src/commands/helpers.rs b/src/commands/helpers.rs index 7339462..103a083 100644 --- a/src/commands/helpers.rs +++ b/src/commands/helpers.rs @@ -85,7 +85,7 @@ pub fn progress_bytes(prefix: impl Into>) -> ProgressBar { p } -pub fn warm_up_command(packs: Vec, command: &str) -> Result<()> { +pub fn warm_up_command(packs: impl ExactSizeIterator, command: &str) -> Result<()> { let p = progress_counter("warming up packs..."); p.set_length(packs.len() as u64); for pack in packs { @@ -104,7 +104,10 @@ pub fn warm_up_command(packs: Vec, command: &str) -> Result<()> { Ok(()) } -pub async fn warm_up(be: &impl DecryptReadBackend, packs: Vec) -> Result<()> { +pub async fn warm_up( + be: &impl DecryptReadBackend, + packs: impl ExactSizeIterator, +) -> Result<()> { let mut be = be.clone(); be.set_option("retry", "false")?; diff --git a/src/commands/mod.rs b/src/commands/mod.rs index 5b55189..5dd2230 100644 --- a/src/commands/mod.rs +++ b/src/commands/mod.rs @@ -30,6 +30,7 @@ mod key; mod list; mod ls; mod prune; +mod repair; mod repoinfo; mod restore; mod rustic_config; @@ -170,6 +171,9 @@ enum Command { /// Restore a snapshot/path Restore(restore::Opts), + /// Restore a snapshot/path + Repair(repair::Opts), + /// Show general information about the repository Repoinfo(repoinfo::Opts), @@ -318,6 +322,7 @@ pub async fn execute() -> Result<()> { Command::Snapshots(opts) => snapshots::execute(&dbe, opts, config_file).await?, Command::Prune(opts) => prune::execute(&dbe, cache, opts, config, vec![]).await?, Command::Restore(opts) => restore::execute(&dbe, opts).await?, + Command::Repair(opts) => repair::execute(&dbe, opts).await?, Command::Repoinfo(opts) => repoinfo::execute(&dbe, &be_hot, opts).await?, Command::Tag(opts) => tag::execute(&dbe, opts, config_file).await?, }; diff --git a/src/commands/prune.rs b/src/commands/prune.rs index 1315f81..44e93c3 100644 --- a/src/commands/prune.rs +++ b/src/commands/prune.rs @@ -168,14 +168,15 @@ pub(super) async fn execute( pruner.print_stats(); if opts.warm_up { - warm_up(be, pruner.repack_packs()).await?; + warm_up(be, pruner.repack_packs().into_iter()).await?; } else if opts.warm_up_command.is_some() { warm_up_command( - pruner.repack_packs(), + pruner.repack_packs().into_iter(), opts.warm_up_command.as_ref().unwrap(), )?; } wait(opts.warm_up_wait).await; + if !opts.dry_run { pruner.do_prune(be, opts, config).await?; } diff --git a/src/commands/repair.rs b/src/commands/repair.rs new file mode 100644 index 0000000..7b47ec4 --- /dev/null +++ b/src/commands/repair.rs @@ -0,0 +1,169 @@ +use anyhow::Result; +use clap::{Parser, Subcommand}; +use futures::TryStreamExt; +use log::*; +use std::collections::HashMap; + +use crate::backend::{DecryptFullBackend, FileType}; +use crate::index::Indexer; +use crate::repo::{IndexFile, IndexPack, PackHeader, PackHeaderRef}; + +use super::{progress_counter, progress_spinner, wait, warm_up, warm_up_command}; + +#[derive(Parser)] +pub(super) struct Opts { + #[clap(subcommand)] + command: Command, +} + +#[derive(Subcommand)] +enum Command { + /// Repair the repository index + Index(IndexOpts), +} + +#[derive(Default, Parser)] +struct IndexOpts { + // Only show what would be repaired + #[clap(long, short = 'n')] + dry_run: bool, + + // Read all data packs, i.e. completely re-create the index + #[clap(long)] + read_all: bool, + + /// Warm up needed data pack files by only requesting them without processing + #[clap(long)] + warm_up: bool, + + /// Warm up needed data pack files by running the command with %id replaced by pack id + #[clap(long, conflicts_with = "warm-up")] + warm_up_command: Option, + + /// Duration (e.g. 10m) to wait after warm up before doing the actual restore + #[clap(long, value_name = "DURATION", conflicts_with = "dry-run")] + warm_up_wait: Option, +} + +pub(super) async fn execute(be: &impl DecryptFullBackend, opts: Opts) -> Result<()> { + match opts.command { + Command::Index(opt) => repair_index(be, opt).await, + } +} + +async fn repair_index(be: &impl DecryptFullBackend, opts: IndexOpts) -> Result<()> { + let p = progress_spinner("listing packs..."); + let mut packs: HashMap<_, _> = be + .list_with_size(FileType::Pack) + .await? + .into_iter() + .collect(); + p.finish(); + + let mut pack_read_header = Vec::new(); + + let mut process_pack = |p: IndexPack, + to_delete: bool, + new_index: &mut IndexFile, + changed: &mut bool| { + let index_size = p.pack_size(); + let id = p.id; + match packs.remove(&id) { + None => { + // this pack either does not exist or was already indexed in another index file => remove from index! + *changed = true; + debug!("removing non-existing pack {id} from index"); + } + Some(size) if index_size != size => { + // pack exists, but sizes do not + pack_read_header.push(( + id, + to_delete, + Some(PackHeaderRef::from_index_pack(&p).size()), + size.max(index_size), + )); + info!("pack {id}: size computed by index: {index_size}, actual size: {size}, will re-read header"); + *changed = true; + } + _ => { + // pack in repo and index matches + if opts.read_all { + pack_read_header.push(( + id, + to_delete, + Some(PackHeaderRef::from_index_pack(&p).size()), + index_size, + )); + *changed = true + } else { + new_index.add(p, to_delete); + } + } + } + }; + + let p = progress_counter("reading index..."); + let mut stream = be.stream_all::(p.clone()).await?; + while let Some(index) = stream.try_next().await? { + let mut new_index = IndexFile::default(); + let mut changed = false; + let index_id = index.0; + let index = index.1; + for p in index.packs { + process_pack(p, false, &mut new_index, &mut changed); + } + for p in index.packs_to_delete { + process_pack(p, true, &mut new_index, &mut changed); + } + match (changed, opts.dry_run) { + (true, true) => info!("would have modified index file {index_id}"), + (true, false) => { + if !new_index.packs.is_empty() && !new_index.packs_to_delete.is_empty() { + be.save_file(&new_index).await?; + } + be.remove(FileType::Index, &index_id, true).await?; + } + (false, _) => {} // nothing to do + } + } + p.finish(); + + // process packs which are listed but not contained in the index + pack_read_header.extend(packs.into_iter().map(|(id, size)| (id, false, None, size))); + + if opts.warm_up { + warm_up(be, pack_read_header.iter().map(|(id, _, _, _)| *id)).await?; + if opts.dry_run { + return Ok(()); + } + } else if opts.warm_up_command.is_some() { + warm_up_command( + pack_read_header.iter().map(|(id, _, _, _)| *id), + opts.warm_up_command.as_ref().unwrap(), + )?; + if opts.dry_run { + return Ok(()); + } + } + wait(opts.warm_up_wait).await; + + let indexer = Indexer::new(be.clone()).into_shared(); + let p = progress_counter("reading pack headers"); + p.set_length(pack_read_header.len().try_into()?); + for (id, to_delete, size_hint, packsize) in pack_read_header { + debug!("reading pack {id}..."); + let mut pack = IndexPack::default(); + pack.set_id(id); + pack.blobs = PackHeader::from_file(be, id, size_hint, packsize) + .await? + .into_blobs(); + if !opts.dry_run { + indexer.write().await.add_with(pack, to_delete).await?; + } + p.inc(1); + } + indexer.write().await.finalize().await?; + p.finish(); + + Ok(()) +} diff --git a/src/commands/restore.rs b/src/commands/restore.rs index cb542e1..2671532 100644 --- a/src/commands/restore.rs +++ b/src/commands/restore.rs @@ -89,10 +89,10 @@ pub(super) async fn execute(be: &(impl DecryptReadBackend + Unpin), opts: Opts) info!("all file contents are fine."); } else { if opts.warm_up { - warm_up(be, file_infos.to_packs()).await?; + warm_up(be, file_infos.to_packs().into_iter()).await?; } else if opts.warm_up_command.is_some() { warm_up_command( - file_infos.to_packs(), + file_infos.to_packs().into_iter(), opts.warm_up_command.as_ref().unwrap(), )?; } diff --git a/src/repo/packfile.rs b/src/repo/packfile.rs index 64a42d9..8b6a4c1 100644 --- a/src/repo/packfile.rs +++ b/src/repo/packfile.rs @@ -3,8 +3,9 @@ use std::num::NonZeroU32; use anyhow::Result; use binrw::{io::Cursor, BinRead, BinWrite}; -use crate::blob::BlobType; +use crate::backend::FileType; use crate::id::Id; +use crate::{backend::DecryptReadBackend, blob::BlobType}; use super::{IndexBlob, IndexPack}; @@ -154,6 +155,43 @@ impl PackHeader { Ok(Self(blobs)) } + /// Read the pack header directly from a packfile using the backend + pub async fn from_file( + be: &impl DecryptReadBackend, + id: Id, + size_hint: Option, + pack_size: u32, + ) -> Result { + // guess the header size from size_hint and pack_size + // If the guess is too small, we have to re-read. If the guess is too large, we have to have read too much + // but this should normally not matter too much. So we try to overguess here... + let size_guess = size_hint.unwrap_or(0); + + // read (guessed) header + length field + let read_size = size_guess + LENGTH_LEN; + let offset = pack_size - read_size; + let mut data = be + .read_partial(FileType::Pack, &id, false, offset, read_size) + .await?; + + // get header length from the file + let size_real = + PackHeaderLength::from_binary(&data.split_off(size_guess as usize))?.to_u32(); + + // now read the header + let data = if size_real <= size_guess { + // header was alread read + data.split_off((size_guess - size_real) as usize) + } else { + // size_guess was too small; we have to read again + let offset = pack_size - size_real - LENGTH_LEN; + be.read_partial(FileType::Pack, &id, false, offset, size_real) + .await? + }; + + Self::from_binary(&be.decrypt(&data)?) + } + pub fn into_blobs(self) -> Vec { self.0 }