restore: Add option warm-up-wait

This commit is contained in:
Alexander Weiss 2022-08-12 23:24:47 +02:00
parent b51297b7dc
commit 92df64cd80
2 changed files with 85 additions and 65 deletions

View File

@ -2,18 +2,22 @@ use std::fmt::Write;
use std::fs::File;
use std::io::BufReader;
use std::path::PathBuf;
use std::process::Command;
use std::time::Duration;
use anyhow::{bail, Result};
use bytesize::ByteSize;
use futures::{stream::FuturesUnordered, TryStreamExt};
use indicatif::HumanDuration;
use indicatif::{ProgressBar, ProgressState, ProgressStyle};
use rpassword::{prompt_password, read_password_from_bufread};
use tokio::spawn;
use tokio::time::sleep;
use vlog::*;
use crate::backend::ReadBackend;
use crate::backend::{DecryptReadBackend, FileType, ReadBackend};
use crate::crypto::Key;
use crate::repo::find_key_in_backend;
use crate::repo::{find_key_in_backend, Id};
const MAX_PASSWORD_RETRIES: usize = 5;
@ -82,3 +86,55 @@ pub fn progress_bytes() -> ProgressBar {
ProgressBar::hidden()
}
}
pub fn warm_up_command(packs: Vec<Id>, command: &str) -> Result<()> {
for pack in packs {
let id = pack.to_hex();
let actual_command = command.replace("%id", &id);
v1!("calling {actual_command}...");
let mut commands: Vec<_> = actual_command.split(' ').collect();
let status = Command::new(commands[0])
.args(&mut commands[1..])
.status()?;
if !status.success() {
bail!("warm-up command was not successful for pack {id}. {status}");
}
}
Ok(())
}
pub async fn warm_up(be: &impl DecryptReadBackend, packs: Vec<Id>) -> Result<()> {
let mut be = be.clone();
be.set_option("retry", "false")?;
let p = progress_counter();
p.set_length(packs.len() as u64);
let mut stream = FuturesUnordered::new();
const MAX_READER: usize = 20;
for pack in packs {
while stream.len() > MAX_READER {
stream.try_next().await?;
}
let p = p.clone();
let be = be.clone();
stream.push(spawn(async move {
// ignore errors as they are expected from the warm-up
_ = be.read_partial(FileType::Pack, &pack, false, 0, 1).await;
p.inc(1);
}))
}
stream.try_collect().await?;
p.finish();
Ok(())
}
pub async fn wait(d: Option<humantime::Duration>) {
if let Some(wait) = d {
v1!("waiting {}...", wait);
sleep(*wait).await;
}
}

View File

@ -3,7 +3,6 @@ use std::collections::HashMap;
use std::io::Read;
use std::num::NonZeroU32;
use std::path::{Path, PathBuf};
use std::process::Command;
use anyhow::{anyhow, bail, Result};
use clap::Parser;
@ -13,7 +12,7 @@ use ignore::{DirEntry, WalkBuilder};
use tokio::spawn;
use vlog::*;
use super::{bytes, progress_bytes, progress_counter};
use super::{bytes, progress_bytes, progress_counter, wait, warm_up, warm_up_command};
use crate::backend::{DecryptReadBackend, FileType, LocalBackend};
use crate::blob::{Node, NodeStreamer, NodeType, Tree};
use crate::crypto::hash;
@ -28,13 +27,17 @@ pub(super) struct Opts {
dry_run: bool,
/// warm up needed data pack files by only requesting them without processing
#[clap(long, short = 'n', requires = "dry-run")]
#[clap(long)]
warm_up: bool,
/// warm up needed data pack files by running the command with %id replaced by pack id
#[clap(long, short = 'n', requires = "dry-run", conflicts_with = "warm-up")]
#[clap(long, conflicts_with = "warm-up")]
warm_up_command: Option<String>,
/// duration (e.g. 10m) to wait after warm up before doing the actual restore
#[clap(long, value_name = "DURATION", conflicts_with = "dry-run")]
warm_up_wait: Option<humantime::Duration>,
/// remove all files/dirs destination which are not contained in snapshot.
/// Warning: Use with care, maybe first try this with --dry-run?
#[clap(long)]
@ -80,15 +83,22 @@ pub(super) async fn execute(be: &(impl DecryptReadBackend + Unpin), opts: Opts)
if file_infos.total_size == file_infos.matched_size {
v1!("all file contents are fine.");
} else if opts.warm_up {
v1!("warming up needed data pack files...");
warm_up(be, file_infos).await?;
} else if opts.warm_up_command.is_some() {
v1!("warming up needed data pack files...");
warm_up_command(file_infos, opts.warm_up_command.as_ref().unwrap())?;
} else if !opts.dry_run {
v1!("restoring missing file contents...");
restore_contents(be, &dest, file_infos).await?;
} else {
if opts.warm_up {
v1!("warming up needed data pack files...");
warm_up(be, file_infos.to_packs()).await?;
} else if opts.warm_up_command.is_some() {
v1!("warming up needed data pack files...");
warm_up_command(
file_infos.to_packs(),
opts.warm_up_command.as_ref().unwrap(),
)?;
}
wait(opts.warm_up_wait).await;
if !opts.dry_run {
v1!("restoring missing file contents...");
restore_contents(be, &dest, file_infos).await?;
}
}
if !opts.dry_run {
@ -233,52 +243,6 @@ async fn allocate_and_collect(
Ok(file_infos)
}
fn warm_up_command(file_infos: FileInfos, command: &str) -> Result<()> {
for pack in file_infos.into_packs() {
let id = pack.to_hex();
let actual_command = command.replace("%id", &id);
v1!("calling {actual_command}...");
let mut commands: Vec<_> = actual_command.split(' ').collect();
let status = Command::new(commands[0])
.args(&mut commands[1..])
.status()?;
if !status.success() {
bail!("warm-up command was not successful for pack {id}. {status}");
}
}
Ok(())
}
async fn warm_up(be: &impl DecryptReadBackend, file_infos: FileInfos) -> Result<()> {
let packs = file_infos.into_packs();
let mut be = be.clone();
be.set_option("retry", "false")?;
let p = progress_counter();
p.set_length(packs.len() as u64);
let mut stream = FuturesUnordered::new();
const MAX_READER: usize = 20;
for pack in packs {
while stream.len() > MAX_READER {
stream.try_next().await?;
}
let p = p.clone();
let be = be.clone();
stream.push(spawn(async move {
// ignore errors as they are expected from the warm-up
_ = be.read_partial(FileType::Pack, &pack, false, 0, 1).await;
p.inc(1);
}))
}
stream.try_collect().await?;
p.finish();
Ok(())
}
/// restore_contents restores all files contents as described by file_infos
/// using the ReadBackend be and writing them into the LocalBackend dest.
async fn restore_contents(
@ -509,12 +473,12 @@ impl FileInfos {
Ok(open_file.is_none().then(|| file_pos))
}
// filter out packs which we need
fn into_packs(self) -> Vec<Id> {
fn to_packs(&self) -> Vec<Id> {
self.r
.into_iter()
.iter()
// filter out packs which we need
.filter(|(_, blob)| blob.iter().any(|(_, fls)| fls.iter().all(|fl| !fl.matches)))
.map(|(pack, _)| pack)
.map(|(pack, _)| *pack)
.collect()
}
}