Add repair index command

This commit is contained in:
Alexander Weiss 2022-10-07 14:32:49 +02:00
parent f550e09be9
commit 040aa2a336
9 changed files with 263 additions and 78 deletions

View File

@ -16,7 +16,18 @@ impl<T: DecryptWriteBackend + DecryptReadBackend> DecryptFullBackend for T {}
#[async_trait]
pub trait DecryptReadBackend: ReadBackend {
async fn read_encrypted_full(&self, tpe: FileType, id: &Id) -> Result<Bytes>;
fn decrypt(&self, data: &[u8]) -> Result<Vec<u8>>;
async fn read_encrypted_full(&self, tpe: FileType, id: &Id) -> Result<Bytes> {
let decrypted = self.decrypt(&self.read_full(tpe, id).await?)?;
Ok(match decrypted[0] {
b'{' | b'[' => decrypted, // not compressed
2 => decode_all(&decrypted[1..])?, // 2 indicates compressed data following
_ => bail!("not supported"),
}
.into())
}
async fn read_encrypted_partial(
&self,
tpe: FileType,
@ -25,7 +36,20 @@ pub trait DecryptReadBackend: ReadBackend {
offset: u32,
length: u32,
uncompressed_length: Option<NonZeroU32>,
) -> Result<Bytes>;
) -> Result<Bytes> {
let mut data = self.decrypt(
&self
.read_partial(tpe, id, cacheable, offset, length)
.await?,
)?;
if let Some(length) = uncompressed_length {
data = decode_all(&*data).unwrap();
if data.len() != length.get() as usize {
bail!("length of uncompressed data does not match!");
}
}
Ok(data.into())
}
async fn get_file<F: RepoFile>(&self, id: &Id) -> Result<F> {
let data = self.read_encrypted_full(F::TYPE, id).await?;
@ -160,40 +184,8 @@ impl<R: WriteBackend, C: CryptoKey> DecryptWriteBackend for DecryptBackend<R, C>
#[async_trait]
impl<R: ReadBackend, C: CryptoKey> DecryptReadBackend for DecryptBackend<R, C> {
async fn read_encrypted_full(&self, tpe: FileType, id: &Id) -> Result<Bytes> {
let decrypted = self
.key
.decrypt_data(&self.backend.read_full(tpe, id).await?)?;
Ok(match decrypted[0] {
b'{' | b'[' => decrypted, // not compressed
2 => decode_all(&decrypted[1..])?, // 2 indicates compressed data following
_ => bail!("not supported"),
}
.into())
}
async fn read_encrypted_partial(
&self,
tpe: FileType,
id: &Id,
cacheable: bool,
offset: u32,
length: u32,
uncompressed_length: Option<NonZeroU32>,
) -> Result<Bytes> {
let mut data = self.key.decrypt_data(
&self
.backend
.read_partial(tpe, id, cacheable, offset, length)
.await?,
)?;
if let Some(length) = uncompressed_length {
data = decode_all(&*data).unwrap();
if data.len() != length.get() as usize {
bail!("length of uncompressed data does not match!");
}
}
Ok(data.into())
fn decrypt(&self, data: &[u8]) -> Result<Vec<u8>> {
Ok(self.key.decrypt_data(data)?)
}
}

View File

@ -1,5 +1,3 @@
use std::num::NonZeroU32;
use anyhow::Result;
use async_trait::async_trait;
use bytes::Bytes;
@ -23,21 +21,8 @@ impl<BE: DecryptFullBackend> DryRunBackend<BE> {
#[async_trait]
impl<BE: DecryptFullBackend> DecryptReadBackend for DryRunBackend<BE> {
async fn read_encrypted_full(&self, tpe: FileType, id: &Id) -> Result<Bytes> {
self.be.read_encrypted_full(tpe, id).await
}
async fn read_encrypted_partial(
&self,
tpe: FileType,
id: &Id,
cacheable: bool,
offset: u32,
length: u32,
uncompressed_length: Option<NonZeroU32>,
) -> Result<Bytes> {
self.be
.read_encrypted_partial(tpe, id, cacheable, offset, length, uncompressed_length)
.await
fn decrypt(&self, data: &[u8]) -> Result<Vec<u8>> {
self.be.decrypt(data)
}
}

View File

@ -10,10 +10,10 @@ use tokio::task::spawn_blocking;
use zstd::stream::decode_all;
use super::{progress_bytes, progress_counter};
use crate::backend::{Cache, DecryptFullBackend, DecryptReadBackend, FileType, ReadBackend};
use crate::backend::{Cache, DecryptReadBackend, FileType, ReadBackend};
use crate::blob::{BlobType, NodeType, TreeStreamerOnce};
use crate::commands::helpers::progress_spinner;
use crate::crypto::{hash, CryptoKey};
use crate::crypto::hash;
use crate::id::Id;
use crate::index::{IndexBackend, IndexCollector, IndexType, IndexedBackend};
use crate::repo::{
@ -32,7 +32,7 @@ pub(super) struct Opts {
}
pub(super) async fn execute(
be: &(impl DecryptFullBackend + Unpin), // TODO: this should be a DecryptReadBackend; see check_pack()
be: &(impl DecryptReadBackend + Unpin),
cache: &Option<Cache>,
hot_be: &Option<impl ReadBackend>,
raw_be: &impl ReadBackend,
@ -254,16 +254,16 @@ async fn check_packs(
async fn check_packs_list(be: &impl ReadBackend, mut packs: HashMap<Id, u32>) -> Result<()> {
for (id, size) in be.list_with_size(FileType::Pack).await? {
match packs.remove(&id) {
None => warn!("pack {id} not referenced in index"),
None => warn!("pack {id} not referenced in index. Can be a parallel backup job. To repair: 'rustic repair index'."),
Some(index_size) if index_size != size => {
error!("pack {id}: size computed by index: {index_size}, actual size: {size}",)
error!("pack {id}: size computed by index: {index_size}, actual size: {size}. To repair: 'rustic repair index'.")
}
_ => {} //everything ok
}
}
for (id, _) in packs {
error!("pack {id} is referenced by the index but not present!",);
error!("pack {id} is referenced by the index but not present! To repair: 'rustic repair index'.",);
}
Ok(())
}
@ -294,7 +294,7 @@ async fn check_snapshots(index: &(impl IndexedBackend + Unpin)) -> Result<()> {
if !index.has_data(id) {
error!(
"file {:?} blob {} is missig in index",
"file {:?} blob {} is missing in index",
path.join(node.name()),
id
);
@ -322,11 +322,7 @@ async fn check_snapshots(index: &(impl IndexedBackend + Unpin)) -> Result<()> {
Ok(())
}
fn check_pack(
be: &impl DecryptFullBackend, // TODO: this should be a DecryptReadBackend; we just additionally need the key() method
index_pack: IndexPack,
mut data: Bytes,
) -> Result<()> {
fn check_pack(be: &impl DecryptReadBackend, index_pack: IndexPack, mut data: Bytes) -> Result<()> {
let id = index_pack.id;
let size = index_pack.pack_size();
if data.len() != size as usize {
@ -352,9 +348,7 @@ fn check_pack(
}
// check header
let header = be
.key()
.decrypt_data(&data.split_off(data.len() - header_len as usize))?;
let header = be.decrypt(&data.split_off(data.len() - header_len as usize))?;
let pack_blobs = PackHeader::from_binary(&header)?.into_blobs();
let mut blobs = index_pack.blobs;
@ -369,9 +363,7 @@ fn check_pack(
// check blobs
for blob in blobs {
let blob_id = blob.id;
let mut blob_data = be
.key()
.decrypt_data(&data.split_to(blob.length as usize))?;
let mut blob_data = be.decrypt(&data.split_to(blob.length as usize))?;
// TODO: this is identical to backend/decrypt.rs; unify these two parts!
if let Some(length) = blob.uncompressed_length {

View File

@ -85,7 +85,7 @@ pub fn progress_bytes(prefix: impl Into<Cow<'static, str>>) -> ProgressBar {
p
}
pub fn warm_up_command(packs: Vec<Id>, command: &str) -> Result<()> {
pub fn warm_up_command(packs: impl ExactSizeIterator<Item = Id>, command: &str) -> Result<()> {
let p = progress_counter("warming up packs...");
p.set_length(packs.len() as u64);
for pack in packs {
@ -104,7 +104,10 @@ pub fn warm_up_command(packs: Vec<Id>, command: &str) -> Result<()> {
Ok(())
}
pub async fn warm_up(be: &impl DecryptReadBackend, packs: Vec<Id>) -> Result<()> {
pub async fn warm_up(
be: &impl DecryptReadBackend,
packs: impl ExactSizeIterator<Item = Id>,
) -> Result<()> {
let mut be = be.clone();
be.set_option("retry", "false")?;

View File

@ -30,6 +30,7 @@ mod key;
mod list;
mod ls;
mod prune;
mod repair;
mod repoinfo;
mod restore;
mod rustic_config;
@ -170,6 +171,9 @@ enum Command {
/// Restore a snapshot/path
Restore(restore::Opts),
/// Restore a snapshot/path
Repair(repair::Opts),
/// Show general information about the repository
Repoinfo(repoinfo::Opts),
@ -318,6 +322,7 @@ pub async fn execute() -> Result<()> {
Command::Snapshots(opts) => snapshots::execute(&dbe, opts, config_file).await?,
Command::Prune(opts) => prune::execute(&dbe, cache, opts, config, vec![]).await?,
Command::Restore(opts) => restore::execute(&dbe, opts).await?,
Command::Repair(opts) => repair::execute(&dbe, opts).await?,
Command::Repoinfo(opts) => repoinfo::execute(&dbe, &be_hot, opts).await?,
Command::Tag(opts) => tag::execute(&dbe, opts, config_file).await?,
};

View File

@ -168,14 +168,15 @@ pub(super) async fn execute(
pruner.print_stats();
if opts.warm_up {
warm_up(be, pruner.repack_packs()).await?;
warm_up(be, pruner.repack_packs().into_iter()).await?;
} else if opts.warm_up_command.is_some() {
warm_up_command(
pruner.repack_packs(),
pruner.repack_packs().into_iter(),
opts.warm_up_command.as_ref().unwrap(),
)?;
}
wait(opts.warm_up_wait).await;
if !opts.dry_run {
pruner.do_prune(be, opts, config).await?;
}

169
src/commands/repair.rs Normal file
View File

@ -0,0 +1,169 @@
use anyhow::Result;
use clap::{Parser, Subcommand};
use futures::TryStreamExt;
use log::*;
use std::collections::HashMap;
use crate::backend::{DecryptFullBackend, FileType};
use crate::index::Indexer;
use crate::repo::{IndexFile, IndexPack, PackHeader, PackHeaderRef};
use super::{progress_counter, progress_spinner, wait, warm_up, warm_up_command};
#[derive(Parser)]
pub(super) struct Opts {
#[clap(subcommand)]
command: Command,
}
#[derive(Subcommand)]
enum Command {
/// Repair the repository index
Index(IndexOpts),
}
#[derive(Default, Parser)]
struct IndexOpts {
// Only show what would be repaired
#[clap(long, short = 'n')]
dry_run: bool,
// Read all data packs, i.e. completely re-create the index
#[clap(long)]
read_all: bool,
/// Warm up needed data pack files by only requesting them without processing
#[clap(long)]
warm_up: bool,
/// Warm up needed data pack files by running the command with %id replaced by pack id
#[clap(long, conflicts_with = "warm-up")]
warm_up_command: Option<String>,
/// Duration (e.g. 10m) to wait after warm up before doing the actual restore
#[clap(long, value_name = "DURATION", conflicts_with = "dry-run")]
warm_up_wait: Option<humantime::Duration>,
}
pub(super) async fn execute(be: &impl DecryptFullBackend, opts: Opts) -> Result<()> {
match opts.command {
Command::Index(opt) => repair_index(be, opt).await,
}
}
async fn repair_index(be: &impl DecryptFullBackend, opts: IndexOpts) -> Result<()> {
let p = progress_spinner("listing packs...");
let mut packs: HashMap<_, _> = be
.list_with_size(FileType::Pack)
.await?
.into_iter()
.collect();
p.finish();
let mut pack_read_header = Vec::new();
let mut process_pack = |p: IndexPack,
to_delete: bool,
new_index: &mut IndexFile,
changed: &mut bool| {
let index_size = p.pack_size();
let id = p.id;
match packs.remove(&id) {
None => {
// this pack either does not exist or was already indexed in another index file => remove from index!
*changed = true;
debug!("removing non-existing pack {id} from index");
}
Some(size) if index_size != size => {
// pack exists, but sizes do not
pack_read_header.push((
id,
to_delete,
Some(PackHeaderRef::from_index_pack(&p).size()),
size.max(index_size),
));
info!("pack {id}: size computed by index: {index_size}, actual size: {size}, will re-read header");
*changed = true;
}
_ => {
// pack in repo and index matches
if opts.read_all {
pack_read_header.push((
id,
to_delete,
Some(PackHeaderRef::from_index_pack(&p).size()),
index_size,
));
*changed = true
} else {
new_index.add(p, to_delete);
}
}
}
};
let p = progress_counter("reading index...");
let mut stream = be.stream_all::<IndexFile>(p.clone()).await?;
while let Some(index) = stream.try_next().await? {
let mut new_index = IndexFile::default();
let mut changed = false;
let index_id = index.0;
let index = index.1;
for p in index.packs {
process_pack(p, false, &mut new_index, &mut changed);
}
for p in index.packs_to_delete {
process_pack(p, true, &mut new_index, &mut changed);
}
match (changed, opts.dry_run) {
(true, true) => info!("would have modified index file {index_id}"),
(true, false) => {
if !new_index.packs.is_empty() && !new_index.packs_to_delete.is_empty() {
be.save_file(&new_index).await?;
}
be.remove(FileType::Index, &index_id, true).await?;
}
(false, _) => {} // nothing to do
}
}
p.finish();
// process packs which are listed but not contained in the index
pack_read_header.extend(packs.into_iter().map(|(id, size)| (id, false, None, size)));
if opts.warm_up {
warm_up(be, pack_read_header.iter().map(|(id, _, _, _)| *id)).await?;
if opts.dry_run {
return Ok(());
}
} else if opts.warm_up_command.is_some() {
warm_up_command(
pack_read_header.iter().map(|(id, _, _, _)| *id),
opts.warm_up_command.as_ref().unwrap(),
)?;
if opts.dry_run {
return Ok(());
}
}
wait(opts.warm_up_wait).await;
let indexer = Indexer::new(be.clone()).into_shared();
let p = progress_counter("reading pack headers");
p.set_length(pack_read_header.len().try_into()?);
for (id, to_delete, size_hint, packsize) in pack_read_header {
debug!("reading pack {id}...");
let mut pack = IndexPack::default();
pack.set_id(id);
pack.blobs = PackHeader::from_file(be, id, size_hint, packsize)
.await?
.into_blobs();
if !opts.dry_run {
indexer.write().await.add_with(pack, to_delete).await?;
}
p.inc(1);
}
indexer.write().await.finalize().await?;
p.finish();
Ok(())
}

View File

@ -89,10 +89,10 @@ pub(super) async fn execute(be: &(impl DecryptReadBackend + Unpin), opts: Opts)
info!("all file contents are fine.");
} else {
if opts.warm_up {
warm_up(be, file_infos.to_packs()).await?;
warm_up(be, file_infos.to_packs().into_iter()).await?;
} else if opts.warm_up_command.is_some() {
warm_up_command(
file_infos.to_packs(),
file_infos.to_packs().into_iter(),
opts.warm_up_command.as_ref().unwrap(),
)?;
}

View File

@ -3,8 +3,9 @@ use std::num::NonZeroU32;
use anyhow::Result;
use binrw::{io::Cursor, BinRead, BinWrite};
use crate::blob::BlobType;
use crate::backend::FileType;
use crate::id::Id;
use crate::{backend::DecryptReadBackend, blob::BlobType};
use super::{IndexBlob, IndexPack};
@ -154,6 +155,43 @@ impl PackHeader {
Ok(Self(blobs))
}
/// Read the pack header directly from a packfile using the backend
pub async fn from_file(
be: &impl DecryptReadBackend,
id: Id,
size_hint: Option<u32>,
pack_size: u32,
) -> Result<Self> {
// guess the header size from size_hint and pack_size
// If the guess is too small, we have to re-read. If the guess is too large, we have to have read too much
// but this should normally not matter too much. So we try to overguess here...
let size_guess = size_hint.unwrap_or(0);
// read (guessed) header + length field
let read_size = size_guess + LENGTH_LEN;
let offset = pack_size - read_size;
let mut data = be
.read_partial(FileType::Pack, &id, false, offset, read_size)
.await?;
// get header length from the file
let size_real =
PackHeaderLength::from_binary(&data.split_off(size_guess as usize))?.to_u32();
// now read the header
let data = if size_real <= size_guess {
// header was alread read
data.split_off((size_guess - size_real) as usize)
} else {
// size_guess was too small; we have to read again
let offset = pack_size - size_real - LENGTH_LEN;
be.read_partial(FileType::Pack, &id, false, offset, size_real)
.await?
};
Self::from_binary(&be.decrypt(&data)?)
}
pub fn into_blobs(self) -> Vec<IndexBlob> {
self.0
}