diff --git a/src/commands/helpers.rs b/src/commands/helpers.rs index e70a82d..12b8a5b 100644 --- a/src/commands/helpers.rs +++ b/src/commands/helpers.rs @@ -2,18 +2,22 @@ use std::fmt::Write; use std::fs::File; use std::io::BufReader; use std::path::PathBuf; +use std::process::Command; use std::time::Duration; use anyhow::{bail, Result}; use bytesize::ByteSize; +use futures::{stream::FuturesUnordered, TryStreamExt}; use indicatif::HumanDuration; use indicatif::{ProgressBar, ProgressState, ProgressStyle}; use rpassword::{prompt_password, read_password_from_bufread}; +use tokio::spawn; +use tokio::time::sleep; use vlog::*; -use crate::backend::ReadBackend; +use crate::backend::{DecryptReadBackend, FileType, ReadBackend}; use crate::crypto::Key; -use crate::repo::find_key_in_backend; +use crate::repo::{find_key_in_backend, Id}; const MAX_PASSWORD_RETRIES: usize = 5; @@ -82,3 +86,55 @@ pub fn progress_bytes() -> ProgressBar { ProgressBar::hidden() } } + +pub fn warm_up_command(packs: Vec, command: &str) -> Result<()> { + for pack in packs { + let id = pack.to_hex(); + let actual_command = command.replace("%id", &id); + v1!("calling {actual_command}..."); + let mut commands: Vec<_> = actual_command.split(' ').collect(); + let status = Command::new(commands[0]) + .args(&mut commands[1..]) + .status()?; + if !status.success() { + bail!("warm-up command was not successful for pack {id}. {status}"); + } + } + Ok(()) +} + +pub async fn warm_up(be: &impl DecryptReadBackend, packs: Vec) -> Result<()> { + let mut be = be.clone(); + be.set_option("retry", "false")?; + + let p = progress_counter(); + p.set_length(packs.len() as u64); + let mut stream = FuturesUnordered::new(); + + const MAX_READER: usize = 20; + for pack in packs { + while stream.len() > MAX_READER { + stream.try_next().await?; + } + + let p = p.clone(); + let be = be.clone(); + stream.push(spawn(async move { + // ignore errors as they are expected from the warm-up + _ = be.read_partial(FileType::Pack, &pack, false, 0, 1).await; + p.inc(1); + })) + } + + stream.try_collect().await?; + p.finish(); + + Ok(()) +} + +pub async fn wait(d: Option) { + if let Some(wait) = d { + v1!("waiting {}...", wait); + sleep(*wait).await; + } +} diff --git a/src/commands/restore.rs b/src/commands/restore.rs index 2079dbf..9e90842 100644 --- a/src/commands/restore.rs +++ b/src/commands/restore.rs @@ -3,7 +3,6 @@ use std::collections::HashMap; use std::io::Read; use std::num::NonZeroU32; use std::path::{Path, PathBuf}; -use std::process::Command; use anyhow::{anyhow, bail, Result}; use clap::Parser; @@ -13,7 +12,7 @@ use ignore::{DirEntry, WalkBuilder}; use tokio::spawn; use vlog::*; -use super::{bytes, progress_bytes, progress_counter}; +use super::{bytes, progress_bytes, progress_counter, wait, warm_up, warm_up_command}; use crate::backend::{DecryptReadBackend, FileType, LocalBackend}; use crate::blob::{Node, NodeStreamer, NodeType, Tree}; use crate::crypto::hash; @@ -28,13 +27,17 @@ pub(super) struct Opts { dry_run: bool, /// warm up needed data pack files by only requesting them without processing - #[clap(long, short = 'n', requires = "dry-run")] + #[clap(long)] warm_up: bool, /// warm up needed data pack files by running the command with %id replaced by pack id - #[clap(long, short = 'n', requires = "dry-run", conflicts_with = "warm-up")] + #[clap(long, conflicts_with = "warm-up")] warm_up_command: Option, + /// duration (e.g. 10m) to wait after warm up before doing the actual restore + #[clap(long, value_name = "DURATION", conflicts_with = "dry-run")] + warm_up_wait: Option, + /// remove all files/dirs destination which are not contained in snapshot. /// Warning: Use with care, maybe first try this with --dry-run? #[clap(long)] @@ -80,15 +83,22 @@ pub(super) async fn execute(be: &(impl DecryptReadBackend + Unpin), opts: Opts) if file_infos.total_size == file_infos.matched_size { v1!("all file contents are fine."); - } else if opts.warm_up { - v1!("warming up needed data pack files..."); - warm_up(be, file_infos).await?; - } else if opts.warm_up_command.is_some() { - v1!("warming up needed data pack files..."); - warm_up_command(file_infos, opts.warm_up_command.as_ref().unwrap())?; - } else if !opts.dry_run { - v1!("restoring missing file contents..."); - restore_contents(be, &dest, file_infos).await?; + } else { + if opts.warm_up { + v1!("warming up needed data pack files..."); + warm_up(be, file_infos.to_packs()).await?; + } else if opts.warm_up_command.is_some() { + v1!("warming up needed data pack files..."); + warm_up_command( + file_infos.to_packs(), + opts.warm_up_command.as_ref().unwrap(), + )?; + } + wait(opts.warm_up_wait).await; + if !opts.dry_run { + v1!("restoring missing file contents..."); + restore_contents(be, &dest, file_infos).await?; + } } if !opts.dry_run { @@ -233,52 +243,6 @@ async fn allocate_and_collect( Ok(file_infos) } -fn warm_up_command(file_infos: FileInfos, command: &str) -> Result<()> { - for pack in file_infos.into_packs() { - let id = pack.to_hex(); - let actual_command = command.replace("%id", &id); - v1!("calling {actual_command}..."); - let mut commands: Vec<_> = actual_command.split(' ').collect(); - let status = Command::new(commands[0]) - .args(&mut commands[1..]) - .status()?; - if !status.success() { - bail!("warm-up command was not successful for pack {id}. {status}"); - } - } - Ok(()) -} - -async fn warm_up(be: &impl DecryptReadBackend, file_infos: FileInfos) -> Result<()> { - let packs = file_infos.into_packs(); - let mut be = be.clone(); - be.set_option("retry", "false")?; - - let p = progress_counter(); - p.set_length(packs.len() as u64); - let mut stream = FuturesUnordered::new(); - - const MAX_READER: usize = 20; - for pack in packs { - while stream.len() > MAX_READER { - stream.try_next().await?; - } - - let p = p.clone(); - let be = be.clone(); - stream.push(spawn(async move { - // ignore errors as they are expected from the warm-up - _ = be.read_partial(FileType::Pack, &pack, false, 0, 1).await; - p.inc(1); - })) - } - - stream.try_collect().await?; - p.finish(); - - Ok(()) -} - /// restore_contents restores all files contents as described by file_infos /// using the ReadBackend be and writing them into the LocalBackend dest. async fn restore_contents( @@ -509,12 +473,12 @@ impl FileInfos { Ok(open_file.is_none().then(|| file_pos)) } - // filter out packs which we need - fn into_packs(self) -> Vec { + fn to_packs(&self) -> Vec { self.r - .into_iter() + .iter() + // filter out packs which we need .filter(|(_, blob)| blob.iter().any(|(_, fls)| fls.iter().all(|fl| !fl.matches))) - .map(|(pack, _)| pack) + .map(|(pack, _)| *pack) .collect() } }