From 565cb11369ff6e4c0b256469c0e5abacc0a45819 Mon Sep 17 00:00:00 2001 From: Alexander Weiss Date: Mon, 8 Aug 2022 20:42:51 +0200 Subject: [PATCH 1/6] packer: don't use temporary files --- Cargo.toml | 2 +- src/backend/cache.rs | 41 +++++++++-------------------------------- src/backend/choose.rs | 24 ++++++++++-------------- src/backend/decrypt.rs | 17 +++++++++-------- src/backend/dry_run.rs | 18 ++++++++---------- src/backend/hotcold.rs | 28 ++++++++++------------------ src/backend/local.rs | 19 +++---------------- src/backend/mod.rs | 10 +++++++--- src/backend/rclone.rs | 14 ++++++++------ src/backend/rest.rs | 20 ++++++++------------ src/blob/packer.rs | 17 +++++++---------- src/commands/init.rs | 5 +++-- src/commands/key.rs | 2 +- 13 files changed, 84 insertions(+), 133 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3e6afed..be3c892 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,7 +46,6 @@ serde_json = "1" serde-aux = "3" # other dependencies chrono = { version = "0.4", features = ["serde"] } -tempfile = "3" zstd = "0.11" enum-map = "2" enum-map-derive = "0.10" @@ -59,6 +58,7 @@ filetime = "0.2" reqwest = {version = "0.11", default-features = false, features = ["json", "rustls-tls", "stream"] } # rclone backend sha1 = "0.10" +tempfile = "3" # cache dirs = "4" cachedir = "0.3" diff --git a/src/backend/cache.rs b/src/backend/cache.rs index b64ea74..d22fe91 100644 --- a/src/backend/cache.rs +++ b/src/backend/cache.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; use std::fs::{self, File}; -use std::io::{copy, Read, Seek, SeekFrom, Write}; +use std::io::{Read, Seek, SeekFrom, Write}; use std::path::PathBuf; use anyhow::{anyhow, Result}; @@ -97,24 +97,19 @@ impl WriteBackend for CachedBackend { self.be.create().await } - async fn write_file(&self, tpe: FileType, id: &Id, cacheable: bool, mut f: File) -> Result<()> { + async fn write_bytes( + &self, + tpe: FileType, + id: &Id, + cacheable: bool, + buf: Vec, + ) -> Result<()> { if let Some(cache) = &self.cache { if cacheable || tpe.is_cacheable() { - let f_cache = f.try_clone()?; - let _ = cache.write_file(tpe, id, cacheable, f_cache).await; - f.seek(SeekFrom::Start(0))?; - } - } - self.be.write_file(tpe, id, cacheable, f).await - } - - async fn write_bytes(&self, tpe: FileType, id: &Id, buf: Vec) -> Result<()> { - if let Some(cache) = &self.cache { - if tpe.is_cacheable() { let _ = cache.write_bytes(tpe, id, buf.clone()).await; } } - self.be.write_bytes(tpe, id, buf).await + self.be.write_bytes(tpe, id, cacheable, buf).await } async fn remove(&self, tpe: FileType, id: &Id, cacheable: bool) -> Result<()> { @@ -239,24 +234,6 @@ impl Cache { Ok(vec) } - async fn write_file( - &self, - tpe: FileType, - id: &Id, - _cacheable: bool, - mut f: File, - ) -> Result<()> { - v3!("cache writing tpe: {:?}, id: {}", &tpe, &id); - fs::create_dir_all(self.dir(tpe, id))?; - let filename = self.path(tpe, id); - let mut file = fs::OpenOptions::new() - .create(true) - .write(true) - .open(&filename)?; - copy(&mut f, &mut file)?; - Ok(()) - } - async fn write_bytes(&self, tpe: FileType, id: &Id, buf: Vec) -> Result<()> { v3!("cache writing tpe: {:?}, id: {}", &tpe, &id); fs::create_dir_all(self.dir(tpe, id))?; diff --git a/src/backend/choose.rs b/src/backend/choose.rs index 4ceced4..a9ecb97 100644 --- a/src/backend/choose.rs +++ b/src/backend/choose.rs @@ -1,5 +1,3 @@ -use std::fs::File; - use anyhow::{bail, Result}; use async_trait::async_trait; @@ -83,19 +81,17 @@ impl WriteBackend for ChooseBackend { } } - async fn write_file(&self, tpe: FileType, id: &Id, cacheable: bool, f: File) -> Result<()> { + async fn write_bytes( + &self, + tpe: FileType, + id: &Id, + cacheable: bool, + buf: Vec, + ) -> Result<()> { match self { - Local(local) => local.write_file(tpe, id, cacheable, f).await, - Rest(rest) => rest.write_file(tpe, id, cacheable, f).await, - Rclone(rclone) => rclone.write_file(tpe, id, cacheable, f).await, - } - } - - async fn write_bytes(&self, tpe: FileType, id: &Id, buf: Vec) -> Result<()> { - match self { - Local(local) => local.write_bytes(tpe, id, buf).await, - Rest(rest) => rest.write_bytes(tpe, id, buf).await, - Rclone(rclone) => rclone.write_bytes(tpe, id, buf).await, + Local(local) => local.write_bytes(tpe, id, cacheable, buf).await, + Rest(rest) => rest.write_bytes(tpe, id, cacheable, buf).await, + Rclone(rclone) => rclone.write_bytes(tpe, id, cacheable, buf).await, } } diff --git a/src/backend/decrypt.rs b/src/backend/decrypt.rs index 0c01410..3cb6d58 100644 --- a/src/backend/decrypt.rs +++ b/src/backend/decrypt.rs @@ -1,4 +1,3 @@ -use std::fs::File; use std::num::NonZeroU32; use anyhow::{bail, Result}; @@ -149,7 +148,7 @@ impl DecryptWriteBackend for DecryptBackend None => self.key().encrypt_data(data)?, }; let id = hash(&data); - self.write_bytes(tpe, &id, data).await?; + self.write_bytes(tpe, &id, false, data).await?; Ok(id) } @@ -234,12 +233,14 @@ impl WriteBackend for DecryptBackend { self.backend.create().await } - async fn write_file(&self, tpe: FileType, id: &Id, cacheable: bool, f: File) -> Result<()> { - self.backend.write_file(tpe, id, cacheable, f).await - } - - async fn write_bytes(&self, tpe: FileType, id: &Id, buf: Vec) -> Result<()> { - self.backend.write_bytes(tpe, id, buf).await + async fn write_bytes( + &self, + tpe: FileType, + id: &Id, + cacheable: bool, + buf: Vec, + ) -> Result<()> { + self.backend.write_bytes(tpe, id, cacheable, buf).await } async fn remove(&self, tpe: FileType, id: &Id, cacheable: bool) -> Result<()> { diff --git a/src/backend/dry_run.rs b/src/backend/dry_run.rs index 1371fe6..964bd23 100644 --- a/src/backend/dry_run.rs +++ b/src/backend/dry_run.rs @@ -1,4 +1,3 @@ -use std::fs::File; use std::num::NonZeroU32; use anyhow::Result; @@ -101,17 +100,16 @@ impl WriteBackend for DryRunBackend { } } - async fn write_file(&self, tpe: FileType, id: &Id, cacheable: bool, f: File) -> Result<()> { + async fn write_bytes( + &self, + tpe: FileType, + id: &Id, + cacheable: bool, + buf: Vec, + ) -> Result<()> { match self.dry_run { true => Ok(()), - false => self.be.write_file(tpe, id, cacheable, f).await, - } - } - - async fn write_bytes(&self, tpe: FileType, id: &Id, buf: Vec) -> Result<()> { - match self.dry_run { - true => Ok(()), - false => self.be.write_bytes(tpe, id, buf).await, + false => self.be.write_bytes(tpe, id, cacheable, buf).await, } } diff --git a/src/backend/hotcold.rs b/src/backend/hotcold.rs index 94872ac..0e5f0ec 100644 --- a/src/backend/hotcold.rs +++ b/src/backend/hotcold.rs @@ -1,6 +1,3 @@ -use std::fs::File; -use std::io::{Seek, SeekFrom}; - use anyhow::Result; use async_trait::async_trait; @@ -60,24 +57,19 @@ impl WriteBackend for HotColdBackend { self.be.create().await } - async fn write_file(&self, tpe: FileType, id: &Id, cacheable: bool, mut f: File) -> Result<()> { + async fn write_bytes( + &self, + tpe: FileType, + id: &Id, + cacheable: bool, + buf: Vec, + ) -> Result<()> { if let Some(be) = &self.hot_be { - if cacheable || tpe != FileType::Pack { - let f_hot = f.try_clone()?; - be.write_file(tpe, id, cacheable, f_hot).await?; - f.seek(SeekFrom::Start(0))?; + if tpe != FileType::Config && (cacheable || tpe != FileType::Pack) { + be.write_bytes(tpe, id, cacheable, buf.clone()).await?; } } - self.be.write_file(tpe, id, cacheable, f).await - } - - async fn write_bytes(&self, tpe: FileType, id: &Id, buf: Vec) -> Result<()> { - if let Some(be) = &self.hot_be { - if tpe != FileType::Config { - be.write_bytes(tpe, id, buf.clone()).await?; - } - } - self.be.write_bytes(tpe, id, buf).await + self.be.write_bytes(tpe, id, cacheable, buf).await } async fn remove(&self, tpe: FileType, id: &Id, cacheable: bool) -> Result<()> { diff --git a/src/backend/local.rs b/src/backend/local.rs index 26cd940..005c618 100644 --- a/src/backend/local.rs +++ b/src/backend/local.rs @@ -1,5 +1,5 @@ use std::fs::{self, File}; -use std::io::{copy, Read, Seek, SeekFrom, Write}; +use std::io::{Read, Seek, SeekFrom, Write}; use std::os::unix::fs::{symlink, FileExt, PermissionsExt}; use std::path::{Path, PathBuf}; @@ -131,26 +131,13 @@ impl WriteBackend for LocalBackend { Ok(()) } - async fn write_file( + async fn write_bytes( &self, tpe: FileType, id: &Id, _cacheable: bool, - mut f: File, + buf: Vec, ) -> Result<()> { - v3!("writing tpe: {:?}, id: {}", &tpe, &id); - let filename = self.path(tpe, id); - let mut file = fs::OpenOptions::new() - .create(true) - .write(true) - .open(&filename)?; - file.set_len(0)?; - copy(&mut f, &mut file)?; - file.sync_all()?; - Ok(()) - } - - async fn write_bytes(&self, tpe: FileType, id: &Id, buf: Vec) -> Result<()> { v3!("writing tpe: {:?}, id: {}", &tpe, &id); let filename = self.path(tpe, id); let mut file = fs::OpenOptions::new() diff --git a/src/backend/mod.rs b/src/backend/mod.rs index 61f1f28..e4ef0c8 100644 --- a/src/backend/mod.rs +++ b/src/backend/mod.rs @@ -1,4 +1,3 @@ -use std::fs::File; use std::io::Read; use std::path::{Path, PathBuf}; @@ -147,8 +146,13 @@ pub trait ReadBackend: Clone + Send + Sync + 'static { #[async_trait] pub trait WriteBackend: ReadBackend { async fn create(&self) -> Result<()>; - async fn write_file(&self, tpe: FileType, id: &Id, cacheable: bool, f: File) -> Result<()>; - async fn write_bytes(&self, tpe: FileType, id: &Id, buf: Vec) -> Result<()>; + async fn write_bytes( + &self, + tpe: FileType, + id: &Id, + cacheable: bool, + buf: Vec, + ) -> Result<()>; async fn remove(&self, tpe: FileType, id: &Id, cacheable: bool) -> Result<()>; } diff --git a/src/backend/rclone.rs b/src/backend/rclone.rs index 064026f..6e0a917 100644 --- a/src/backend/rclone.rs +++ b/src/backend/rclone.rs @@ -150,12 +150,14 @@ impl WriteBackend for RcloneBackend { self.rest.create().await } - async fn write_file(&self, tpe: FileType, id: &Id, cacheable: bool, f: File) -> Result<()> { - self.rest.write_file(tpe, id, cacheable, f).await - } - - async fn write_bytes(&self, tpe: FileType, id: &Id, buf: Vec) -> Result<()> { - self.rest.write_bytes(tpe, id, buf).await + async fn write_bytes( + &self, + tpe: FileType, + id: &Id, + cacheable: bool, + buf: Vec, + ) -> Result<()> { + self.rest.write_bytes(tpe, id, cacheable, buf).await } async fn remove(&self, tpe: FileType, id: &Id, cacheable: bool) -> Result<()> { diff --git a/src/backend/rest.rs b/src/backend/rest.rs index 1863222..59d08cf 100644 --- a/src/backend/rest.rs +++ b/src/backend/rest.rs @@ -1,4 +1,4 @@ -use std::fs::File; +use std::time::Duration; use anyhow::Result; use async_trait::async_trait; @@ -136,17 +136,13 @@ impl WriteBackend for RestBackend { Ok(()) } - async fn write_file(&self, tpe: FileType, id: &Id, _cacheable: bool, f: File) -> Result<()> { - v3!("writing tpe: {:?}, id: {}", &tpe, &id); - self.client - .post(self.url(tpe, id)) - .body(tokio::fs::File::from_std(f)) - .send() - .await?; - Ok(()) - } - - async fn write_bytes(&self, tpe: FileType, id: &Id, buf: Vec) -> Result<()> { + async fn write_bytes( + &self, + tpe: FileType, + id: &Id, + _cacheable: bool, + buf: Vec, + ) -> Result<()> { v3!("writing tpe: {:?}, id: {}", &tpe, &id); self.client.post(self.url(tpe, id)).body(buf).send().await?; Ok(()) diff --git a/src/blob/packer.rs b/src/blob/packer.rs index c2b400d..9cde696 100644 --- a/src/blob/packer.rs +++ b/src/blob/packer.rs @@ -1,13 +1,10 @@ use integer_sqrt::IntegerSquareRoot; -use std::fs::File; -use std::io::{Seek, SeekFrom, Write}; use std::num::NonZeroU32; use std::time::{Duration, SystemTime}; use anyhow::{anyhow, Result}; use binrw::{io::Cursor, BinWrite}; use chrono::Local; -use tempfile::tempfile; use tokio::{spawn, task::JoinHandle}; use zstd::encode_all; @@ -53,7 +50,7 @@ impl PackSizer { pub struct Packer { be: BE, blob_type: BlobType, - file: File, + file: Vec, size: u32, count: u32, created: SystemTime, @@ -84,7 +81,7 @@ impl Packer { Ok(Self { be, blob_type, - file: tempfile()?, + file: Vec::new(), size: 0, count: 0, created: SystemTime::now(), @@ -104,7 +101,8 @@ impl Packer { pub async fn write_data(&mut self, data: &[u8]) -> Result { self.hasher.update(data); - let len = self.file.write(data)?.try_into()?; + let len = data.len().try_into()?; + self.file.extend_from_slice(data); self.size += len; Ok(len) } @@ -265,7 +263,7 @@ impl Packer { // write file to backend let index = std::mem::take(&mut self.index); - let file = std::mem::replace(&mut self.file, tempfile()?); + let file = std::mem::replace(&mut self.file, Vec::new()); self.file_writer.add(index, file, id).await?; Ok(()) @@ -284,13 +282,12 @@ struct FileWriter { } impl FileWriter { - async fn add(&mut self, mut index: IndexPack, mut file: File, id: Id) -> Result<()> { + async fn add(&mut self, mut index: IndexPack, file: Vec, id: Id) -> Result<()> { let be = self.be.clone(); let indexer = self.indexer.clone(); let cacheable = self.cacheable; let new_future = spawn(async move { - file.seek(SeekFrom::Start(0))?; - be.write_file(FileType::Pack, &id, cacheable, file).await?; + be.write_bytes(FileType::Pack, &id, cacheable, file).await?; index.time = Some(Local::now()); indexer.write().await.add(index).await?; Ok(()) diff --git a/src/commands/init.rs b/src/commands/init.rs index bae7421..ee79c78 100644 --- a/src/commands/init.rs +++ b/src/commands/init.rs @@ -63,11 +63,12 @@ pub(super) async fn execute( let data = serde_json::to_vec(&keyfile)?; let id = hash(&data); be.create().await?; - be.write_bytes(FileType::Key, &id, data.clone()).await?; + be.write_bytes(FileType::Key, &id, false, data.clone()) + .await?; if let Some(hot_be) = hot_be { hot_be.create().await?; - hot_be.write_bytes(FileType::Key, &id, data).await?; + hot_be.write_bytes(FileType::Key, &id, false, data).await?; } println!("key {} successfully added.", id); diff --git a/src/commands/key.rs b/src/commands/key.rs index 86da1db..a954eba 100644 --- a/src/commands/key.rs +++ b/src/commands/key.rs @@ -56,7 +56,7 @@ async fn add_key(be: &impl WriteBackend, key: Key, opts: AddOpts) -> Result<()> let keyfile = KeyFile::generate(key, &pass, opts.hostname, opts.username, opts.with_created)?; let data = serde_json::to_vec(&keyfile)?; let id = hash(&data); - be.write_bytes(FileType::Key, &id, data).await?; + be.write_bytes(FileType::Key, &id, false, data).await?; println!("key {} successfully added.", id); Ok(()) From 53f860fb4f51d0281f4f8c2e64609e716a1963d1 Mon Sep 17 00:00:00 2001 From: Alexander Weiss Date: Sun, 7 Aug 2022 19:55:45 +0200 Subject: [PATCH 2/6] rest/rclone backend: Add retries --- Cargo.lock | 15 ++++ Cargo.toml | 1 + src/backend/rest.rs | 175 +++++++++++++++++++++++++++++--------------- 3 files changed, 130 insertions(+), 61 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0b473cf..7265106 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -110,6 +110,20 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "backoff" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b62ddb9cb1ec0a098ad4bbf9344d0713fa193ae1a80af55febcff2627b6a00c1" +dependencies = [ + "futures-core", + "getrandom 0.2.7", + "instant", + "pin-project-lite", + "rand", + "tokio", +] + [[package]] name = "base64" version = "0.13.0" @@ -1446,6 +1460,7 @@ dependencies = [ "ambassador", "anyhow", "async-trait", + "backoff", "base64", "binrw", "bytesize", diff --git a/Cargo.toml b/Cargo.toml index be3c892..f16ab8d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,6 +56,7 @@ nix = "0.24" filetime = "0.2" # rest backend reqwest = {version = "0.11", default-features = false, features = ["json", "rustls-tls", "stream"] } +backoff = { version = "0.4", features = ["tokio"] } # rclone backend sha1 = "0.10" tempfile = "3" diff --git a/src/backend/rest.rs b/src/backend/rest.rs index 59d08cf..8e29d3d 100644 --- a/src/backend/rest.rs +++ b/src/backend/rest.rs @@ -2,6 +2,7 @@ use std::time::Duration; use anyhow::Result; use async_trait::async_trait; +use backoff::{ExponentialBackoff, ExponentialBackoffBuilder}; use reqwest::{Client, Url}; use serde::Deserialize; use vlog::*; @@ -12,6 +13,12 @@ use super::{FileType, Id, ReadBackend, WriteBackend}; pub struct RestBackend { url: Url, client: Client, + backoff: ExponentialBackoff, +} + +// TODO for backoff: Handle transient vs permanent errors! +fn notify(err: reqwest::Error, duration: Duration) { + println!("Error {err} at {duration:?}, retrying"); } impl RestBackend { @@ -28,6 +35,9 @@ impl RestBackend { Self { url, client: Client::new(), + backoff: ExponentialBackoffBuilder::new() + .with_max_elapsed_time(Some(Duration::from_secs(120))) + .build(), } } @@ -53,54 +63,68 @@ impl ReadBackend for RestBackend { } async fn list_with_size(&self, tpe: FileType) -> Result> { - if tpe == FileType::Config { - return Ok( - match self + Ok(backoff::future::retry_notify( + self.backoff.clone(), + || async { + if tpe == FileType::Config { + return Ok( + match self + .client + .head(self.url.join("config").unwrap()) + .send() + .await? + .status() + .is_success() + { + true => vec![(Id::default(), 0)], + false => Vec::new(), + }, + ); + } + + let mut path = tpe.name().to_string(); + path.push('/'); + let url = self.url.join(&path).unwrap(); + + // format which is delivered by the REST-service + #[derive(Deserialize)] + struct ListEntry { + name: Id, + size: u32, + } + + let list = self .client - .head(self.url.join("config").unwrap()) + .get(url) + .header("Accept", "application/vnd.x.restic.rest.v2") .send() .await? - .status() - .is_success() - { - true => vec![(Id::default(), 0)], - false => Vec::new(), - }, - ); - } - - let mut path = tpe.name().to_string(); - path.push('/'); - let url = self.url.join(&path).unwrap(); - - // format which is delivered by the REST-service - #[derive(Deserialize)] - struct ListEntry { - name: Id, - size: u32, - } - - let list = self - .client - .get(url) - .header("Accept", "application/vnd.x.restic.rest.v2") - .send() - .await? - .json::>() - .await?; - Ok(list.into_iter().map(|i| (i.name, i.size)).collect()) + .json::>() + .await?; + Ok(list.into_iter().map(|i| (i.name, i.size)).collect()) + }, + notify, + ) + .await?) } async fn read_full(&self, tpe: FileType, id: &Id) -> Result> { - Ok(self - .client - .get(self.url(tpe, id)) - .send() - .await? - .bytes() - .await? - .into_iter() - .collect()) + Ok(backoff::future::retry_notify( + self.backoff.clone(), + || async { + Ok(self + .client + .get(self.url(tpe, id)) + .send() + .await? + .bytes() + .await? + .into_iter() + .collect()) + }, + notify, + ) + .await?) } async fn read_partial( @@ -113,27 +137,41 @@ impl ReadBackend for RestBackend { ) -> Result> { let offset2 = offset + length - 1; let header_value = format!("bytes={}-{}", offset, offset2); - Ok(self - .client - .get(self.url(tpe, id)) - .header("Range", header_value) - .send() - .await? - .bytes() - .await? - .into_iter() - .collect()) + Ok(backoff::future::retry_notify( + self.backoff.clone(), + || async { + Ok(self + .client + .get(self.url(tpe, id)) + .header("Range", header_value.clone()) + .send() + .await? + .bytes() + .await? + .into_iter() + .collect()) + }, + notify, + ) + .await?) } } #[async_trait] impl WriteBackend for RestBackend { async fn create(&self) -> Result<()> { - self.client - .post(self.url.join("?create=true").unwrap()) - .send() - .await?; - Ok(()) + Ok(backoff::future::retry_notify( + self.backoff.clone(), + || async { + self.client + .post(self.url.join("?create=true").unwrap()) + .send() + .await?; + Ok(()) + }, + notify, + ) + .await?) } async fn write_bytes( @@ -144,13 +182,28 @@ impl WriteBackend for RestBackend { buf: Vec, ) -> Result<()> { v3!("writing tpe: {:?}, id: {}", &tpe, &id); - self.client.post(self.url(tpe, id)).body(buf).send().await?; - Ok(()) + let req_builder = self.client.post(self.url(tpe, id)).body(buf); + Ok(backoff::future::retry_notify( + self.backoff.clone(), + || async { + req_builder.try_clone().unwrap().send().await?; + Ok(()) + }, + notify, + ) + .await?) } async fn remove(&self, tpe: FileType, id: &Id, _cacheable: bool) -> Result<()> { v3!("removing tpe: {:?}, id: {}", &tpe, &id); - self.client.delete(self.url(tpe, id)).send().await?; - Ok(()) + Ok(backoff::future::retry_notify( + self.backoff.clone(), + || async { + self.client.delete(self.url(tpe, id)).send().await?; + Ok(()) + }, + notify, + ) + .await?) } } From 952961007804ad43b37eeb60fd4b877e187004e4 Mon Sep 17 00:00:00 2001 From: Alexander Weiss Date: Sun, 7 Aug 2022 21:58:13 +0200 Subject: [PATCH 3/6] use Bytes instead of Vec --- Cargo.lock | 1 + Cargo.toml | 1 + src/backend/cache.rs | 23 +++++++++-------------- src/backend/choose.rs | 13 ++++--------- src/backend/decrypt.rs | 28 ++++++++++++---------------- src/backend/dry_run.rs | 17 ++++++----------- src/backend/hotcold.rs | 13 ++++--------- src/backend/local.rs | 15 ++++++++------- src/backend/mod.rs | 15 +++++---------- src/backend/rclone.rs | 13 ++++--------- src/backend/rest.rs | 7 ++++--- src/blob/packer.rs | 11 ++++++----- src/commands/cat.rs | 6 +++--- src/commands/init.rs | 3 ++- src/commands/key.rs | 3 ++- src/index/mod.rs | 5 +++-- 16 files changed, 74 insertions(+), 100 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7265106..3258f20 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1463,6 +1463,7 @@ dependencies = [ "backoff", "base64", "binrw", + "bytes", "bytesize", "cachedir", "cdc", diff --git a/Cargo.toml b/Cargo.toml index f16ab8d..b607243 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,6 +45,7 @@ serde = { version = "1", features = ["derive"] } serde_json = "1" serde-aux = "3" # other dependencies +bytes = "1" chrono = { version = "0.4", features = ["serde"] } zstd = "0.11" enum-map = "2" diff --git a/src/backend/cache.rs b/src/backend/cache.rs index d22fe91..afb4c64 100644 --- a/src/backend/cache.rs +++ b/src/backend/cache.rs @@ -5,6 +5,7 @@ use std::path::PathBuf; use anyhow::{anyhow, Result}; use async_trait::async_trait; +use bytes::Bytes; use dirs::cache_dir; use vlog::*; use walkdir::WalkDir; @@ -41,7 +42,7 @@ impl ReadBackend for CachedBackend { Ok(list) } - async fn read_full(&self, tpe: FileType, id: &Id) -> Result> { + async fn read_full(&self, tpe: FileType, id: &Id) -> Result { match (&self.cache, tpe.is_cacheable()) { (None, _) | (Some(_), false) => self.be.read_full(tpe, id).await, (Some(cache), true) => match cache.read_full(tpe, id).await { @@ -64,7 +65,7 @@ impl ReadBackend for CachedBackend { cacheable: bool, offset: u32, length: u32, - ) -> Result> { + ) -> Result { match (&self.cache, cacheable || tpe.is_cacheable()) { (None, _) | (Some(_), false) => { self.be @@ -97,13 +98,7 @@ impl WriteBackend for CachedBackend { self.be.create().await } - async fn write_bytes( - &self, - tpe: FileType, - id: &Id, - cacheable: bool, - buf: Vec, - ) -> Result<()> { + async fn write_bytes(&self, tpe: FileType, id: &Id, cacheable: bool, buf: Bytes) -> Result<()> { if let Some(cache) = &self.cache { if cacheable || tpe.is_cacheable() { let _ = cache.write_bytes(tpe, id, buf.clone()).await; @@ -206,11 +201,11 @@ impl Cache { Ok(()) } - pub async fn read_full(&self, tpe: FileType, id: &Id) -> Result> { + pub async fn read_full(&self, tpe: FileType, id: &Id) -> Result { v3!("cache reading tpe: {:?}, id: {}", &tpe, &id); let data = fs::read(self.path(tpe, id))?; v3!("cache hit!"); - Ok(data) + Ok(data.into()) } async fn read_partial( @@ -219,7 +214,7 @@ impl Cache { id: &Id, offset: u32, length: u32, - ) -> Result> { + ) -> Result { v3!( "cache reading tpe: {:?}, id: {}, offset: {}", &tpe, @@ -231,10 +226,10 @@ impl Cache { let mut vec = vec![0; length as usize]; file.read_exact(&mut vec)?; v3!("cache hit!"); - Ok(vec) + Ok(vec.into()) } - async fn write_bytes(&self, tpe: FileType, id: &Id, buf: Vec) -> Result<()> { + async fn write_bytes(&self, tpe: FileType, id: &Id, buf: Bytes) -> Result<()> { v3!("cache writing tpe: {:?}, id: {}", &tpe, &id); fs::create_dir_all(self.dir(tpe, id))?; let filename = self.path(tpe, id); diff --git a/src/backend/choose.rs b/src/backend/choose.rs index a9ecb97..a0297b2 100644 --- a/src/backend/choose.rs +++ b/src/backend/choose.rs @@ -1,5 +1,6 @@ use anyhow::{bail, Result}; use async_trait::async_trait; +use bytes::Bytes; use super::{FileType, Id, ReadBackend, WriteBackend}; use super::{LocalBackend, RcloneBackend, RestBackend}; @@ -43,7 +44,7 @@ impl ReadBackend for ChooseBackend { } } - async fn read_full(&self, tpe: FileType, id: &Id) -> Result> { + async fn read_full(&self, tpe: FileType, id: &Id) -> Result { match self { Local(local) => local.read_full(tpe, id).await, Rest(rest) => rest.read_full(tpe, id).await, @@ -58,7 +59,7 @@ impl ReadBackend for ChooseBackend { cacheable: bool, offset: u32, length: u32, - ) -> Result> { + ) -> Result { match self { Local(local) => local.read_partial(tpe, id, cacheable, offset, length).await, Rest(rest) => rest.read_partial(tpe, id, cacheable, offset, length).await, @@ -81,13 +82,7 @@ impl WriteBackend for ChooseBackend { } } - async fn write_bytes( - &self, - tpe: FileType, - id: &Id, - cacheable: bool, - buf: Vec, - ) -> Result<()> { + async fn write_bytes(&self, tpe: FileType, id: &Id, cacheable: bool, buf: Bytes) -> Result<()> { match self { Local(local) => local.write_bytes(tpe, id, cacheable, buf).await, Rest(rest) => rest.write_bytes(tpe, id, cacheable, buf).await, diff --git a/src/backend/decrypt.rs b/src/backend/decrypt.rs index 3cb6d58..e4d9b07 100644 --- a/src/backend/decrypt.rs +++ b/src/backend/decrypt.rs @@ -2,6 +2,7 @@ use std::num::NonZeroU32; use anyhow::{bail, Result}; use async_trait::async_trait; +use bytes::Bytes; use futures::{stream, stream::FuturesUnordered, StreamExt}; use indicatif::ProgressBar; use tokio::{spawn, task::JoinHandle}; @@ -15,7 +16,7 @@ impl DecryptFullBackend for T {} #[async_trait] pub trait DecryptReadBackend: ReadBackend { - async fn read_encrypted_full(&self, tpe: FileType, id: &Id) -> Result>; + async fn read_encrypted_full(&self, tpe: FileType, id: &Id) -> Result; async fn read_encrypted_partial( &self, tpe: FileType, @@ -24,7 +25,7 @@ pub trait DecryptReadBackend: ReadBackend { offset: u32, length: u32, uncompressed_length: Option, - ) -> Result>; + ) -> Result; async fn get_file(&self, id: &Id) -> Result { let data = self.read_encrypted_full(F::TYPE, id).await?; @@ -148,7 +149,7 @@ impl DecryptWriteBackend for DecryptBackend None => self.key().encrypt_data(data)?, }; let id = hash(&data); - self.write_bytes(tpe, &id, false, data).await?; + self.write_bytes(tpe, &id, false, data.into()).await?; Ok(id) } @@ -159,7 +160,7 @@ impl DecryptWriteBackend for DecryptBackend #[async_trait] impl DecryptReadBackend for DecryptBackend { - async fn read_encrypted_full(&self, tpe: FileType, id: &Id) -> Result> { + async fn read_encrypted_full(&self, tpe: FileType, id: &Id) -> Result { let decrypted = self .key .decrypt_data(&self.backend.read_full(tpe, id).await?)?; @@ -167,7 +168,8 @@ impl DecryptReadBackend for DecryptBackend { b'{' | b'[' => decrypted, // not compressed 2 => decode_all(&decrypted[1..])?, // 2 indicates compressed data following _ => bail!("not supported"), - }) + } + .into()) } async fn read_encrypted_partial( @@ -178,7 +180,7 @@ impl DecryptReadBackend for DecryptBackend { offset: u32, length: u32, uncompressed_length: Option, - ) -> Result> { + ) -> Result { let mut data = self.key.decrypt_data( &self .backend @@ -191,7 +193,7 @@ impl DecryptReadBackend for DecryptBackend { bail!("length of uncompressed data does not match!"); } } - Ok(data) + Ok(data.into()) } } @@ -209,7 +211,7 @@ impl ReadBackend for DecryptBackend { self.backend.list_with_size(tpe).await } - async fn read_full(&self, tpe: FileType, id: &Id) -> Result> { + async fn read_full(&self, tpe: FileType, id: &Id) -> Result { self.backend.read_full(tpe, id).await } @@ -220,7 +222,7 @@ impl ReadBackend for DecryptBackend { cacheable: bool, offset: u32, length: u32, - ) -> Result> { + ) -> Result { self.backend .read_partial(tpe, id, cacheable, offset, length) .await @@ -233,13 +235,7 @@ impl WriteBackend for DecryptBackend { self.backend.create().await } - async fn write_bytes( - &self, - tpe: FileType, - id: &Id, - cacheable: bool, - buf: Vec, - ) -> Result<()> { + async fn write_bytes(&self, tpe: FileType, id: &Id, cacheable: bool, buf: Bytes) -> Result<()> { self.backend.write_bytes(tpe, id, cacheable, buf).await } diff --git a/src/backend/dry_run.rs b/src/backend/dry_run.rs index 964bd23..6c557a9 100644 --- a/src/backend/dry_run.rs +++ b/src/backend/dry_run.rs @@ -2,6 +2,7 @@ use std::num::NonZeroU32; use anyhow::Result; use async_trait::async_trait; +use bytes::Bytes; use super::{ DecryptFullBackend, DecryptReadBackend, DecryptWriteBackend, FileType, Id, ReadBackend, @@ -22,7 +23,7 @@ impl DryRunBackend { #[async_trait] impl DecryptReadBackend for DryRunBackend { - async fn read_encrypted_full(&self, tpe: FileType, id: &Id) -> Result> { + async fn read_encrypted_full(&self, tpe: FileType, id: &Id) -> Result { self.be.read_encrypted_full(tpe, id).await } async fn read_encrypted_partial( @@ -33,7 +34,7 @@ impl DecryptReadBackend for DryRunBackend { offset: u32, length: u32, uncompressed_length: Option, - ) -> Result> { + ) -> Result { self.be .read_encrypted_partial(tpe, id, cacheable, offset, length, uncompressed_length) .await @@ -50,7 +51,7 @@ impl ReadBackend for DryRunBackend { self.be.list_with_size(tpe).await } - async fn read_full(&self, tpe: FileType, id: &Id) -> Result> { + async fn read_full(&self, tpe: FileType, id: &Id) -> Result { self.be.read_full(tpe, id).await } @@ -61,7 +62,7 @@ impl ReadBackend for DryRunBackend { cacheable: bool, offset: u32, length: u32, - ) -> Result> { + ) -> Result { self.be .read_partial(tpe, id, cacheable, offset, length) .await @@ -100,13 +101,7 @@ impl WriteBackend for DryRunBackend { } } - async fn write_bytes( - &self, - tpe: FileType, - id: &Id, - cacheable: bool, - buf: Vec, - ) -> Result<()> { + async fn write_bytes(&self, tpe: FileType, id: &Id, cacheable: bool, buf: Bytes) -> Result<()> { match self.dry_run { true => Ok(()), false => self.be.write_bytes(tpe, id, cacheable, buf).await, diff --git a/src/backend/hotcold.rs b/src/backend/hotcold.rs index 0e5f0ec..0792168 100644 --- a/src/backend/hotcold.rs +++ b/src/backend/hotcold.rs @@ -1,5 +1,6 @@ use anyhow::Result; use async_trait::async_trait; +use bytes::Bytes; use super::{FileType, Id, ReadBackend, WriteBackend}; @@ -25,7 +26,7 @@ impl ReadBackend for HotColdBackend { self.be.list_with_size(tpe).await } - async fn read_full(&self, tpe: FileType, id: &Id) -> Result> { + async fn read_full(&self, tpe: FileType, id: &Id) -> Result { match &self.hot_be { None => self.be.read_full(tpe, id).await, Some(be) => be.read_full(tpe, id).await, @@ -39,7 +40,7 @@ impl ReadBackend for HotColdBackend { cacheable: bool, offset: u32, length: u32, - ) -> Result> { + ) -> Result { match (&self.hot_be, cacheable || tpe != FileType::Pack) { (None, _) | (Some(_), false) => { self.be @@ -57,13 +58,7 @@ impl WriteBackend for HotColdBackend { self.be.create().await } - async fn write_bytes( - &self, - tpe: FileType, - id: &Id, - cacheable: bool, - buf: Vec, - ) -> Result<()> { + async fn write_bytes(&self, tpe: FileType, id: &Id, cacheable: bool, buf: Bytes) -> Result<()> { if let Some(be) = &self.hot_be { if tpe != FileType::Config && (cacheable || tpe != FileType::Pack) { be.write_bytes(tpe, id, cacheable, buf.clone()).await?; diff --git a/src/backend/local.rs b/src/backend/local.rs index 005c618..3252942 100644 --- a/src/backend/local.rs +++ b/src/backend/local.rs @@ -5,6 +5,7 @@ use std::path::{Path, PathBuf}; use anyhow::Result; use async_trait::async_trait; +use bytes::Bytes; use filetime::{set_file_atime, set_file_mtime, FileTime}; use nix::sys::stat::{mknod, Mode, SFlag}; use nix::unistd::chown; @@ -99,8 +100,8 @@ impl ReadBackend for LocalBackend { Ok(walker.collect()) } - async fn read_full(&self, tpe: FileType, id: &Id) -> Result> { - Ok(fs::read(self.path(tpe, id))?) + async fn read_full(&self, tpe: FileType, id: &Id) -> Result { + Ok(fs::read(self.path(tpe, id))?.into()) } async fn read_partial( @@ -110,12 +111,12 @@ impl ReadBackend for LocalBackend { _cacheable: bool, offset: u32, length: u32, - ) -> Result> { + ) -> Result { let mut file = File::open(self.path(tpe, id))?; file.seek(SeekFrom::Start(offset.try_into().unwrap()))?; let mut vec = vec![0; length.try_into().unwrap()]; file.read_exact(&mut vec)?; - Ok(vec) + Ok(vec.into()) } } @@ -136,7 +137,7 @@ impl WriteBackend for LocalBackend { tpe: FileType, id: &Id, _cacheable: bool, - buf: Vec, + buf: Bytes, ) -> Result<()> { v3!("writing tpe: {:?}, id: {}", &tpe, &id); let filename = self.path(tpe, id); @@ -265,13 +266,13 @@ impl LocalBackend { Ok(()) } - pub fn read_at(&self, item: impl AsRef, offset: u64, length: u64) -> Result> { + pub fn read_at(&self, item: impl AsRef, offset: u64, length: u64) -> Result { let filename = self.path.join(item); let mut file = File::open(&filename)?; file.seek(SeekFrom::Start(offset))?; let mut vec = vec![0; length.try_into().unwrap()]; file.read_exact(&mut vec).unwrap(); - Ok(vec) + Ok(vec.into()) } pub fn get_matching_file(&self, item: impl AsRef, size: u64) -> Option { diff --git a/src/backend/mod.rs b/src/backend/mod.rs index e4ef0c8..fb8838f 100644 --- a/src/backend/mod.rs +++ b/src/backend/mod.rs @@ -3,6 +3,7 @@ use std::path::{Path, PathBuf}; use anyhow::{anyhow, Result}; use async_trait::async_trait; +use bytes::Bytes; use serde::{de::DeserializeOwned, Serialize}; use crate::id::Id; @@ -83,7 +84,7 @@ pub trait ReadBackend: Clone + Send + Sync + 'static { .collect()) } - async fn read_full(&self, tpe: FileType, id: &Id) -> Result>; + async fn read_full(&self, tpe: FileType, id: &Id) -> Result; async fn read_partial( &self, tpe: FileType, @@ -91,7 +92,7 @@ pub trait ReadBackend: Clone + Send + Sync + 'static { cacheable: bool, offset: u32, length: u32, - ) -> Result>; + ) -> Result; async fn find_starts_with(&self, tpe: FileType, vec: &[String]) -> Result>> { #[derive(Clone, Copy, PartialEq, Eq)] @@ -146,13 +147,7 @@ pub trait ReadBackend: Clone + Send + Sync + 'static { #[async_trait] pub trait WriteBackend: ReadBackend { async fn create(&self) -> Result<()>; - async fn write_bytes( - &self, - tpe: FileType, - id: &Id, - cacheable: bool, - buf: Vec, - ) -> Result<()>; + async fn write_bytes(&self, tpe: FileType, id: &Id, cacheable: bool, buf: Bytes) -> Result<()>; async fn remove(&self, tpe: FileType, id: &Id, cacheable: bool) -> Result<()>; } @@ -165,5 +160,5 @@ pub trait ReadSource: Iterator> { pub trait WriteSource: Clone { fn create(&self, path: PathBuf, node: Node); fn set_metadata(&self, path: PathBuf, node: Node); - fn write_at(&self, path: PathBuf, offset: u64, data: Vec); + fn write_at(&self, path: PathBuf, offset: u64, data: Bytes); } diff --git a/src/backend/rclone.rs b/src/backend/rclone.rs index 6e0a917..e6488cd 100644 --- a/src/backend/rclone.rs +++ b/src/backend/rclone.rs @@ -6,6 +6,7 @@ use std::sync::Arc; use anyhow::{anyhow, bail, Result}; use async_trait::async_trait; +use bytes::Bytes; use rand::distributions::{Alphanumeric, DistString}; use rand::thread_rng; use sha1::{Digest, Sha1}; @@ -126,7 +127,7 @@ impl ReadBackend for RcloneBackend { self.rest.list_with_size(tpe).await } - async fn read_full(&self, tpe: FileType, id: &Id) -> Result> { + async fn read_full(&self, tpe: FileType, id: &Id) -> Result { self.rest.read_full(tpe, id).await } @@ -137,7 +138,7 @@ impl ReadBackend for RcloneBackend { cacheable: bool, offset: u32, length: u32, - ) -> Result> { + ) -> Result { self.rest .read_partial(tpe, id, cacheable, offset, length) .await @@ -150,13 +151,7 @@ impl WriteBackend for RcloneBackend { self.rest.create().await } - async fn write_bytes( - &self, - tpe: FileType, - id: &Id, - cacheable: bool, - buf: Vec, - ) -> Result<()> { + async fn write_bytes(&self, tpe: FileType, id: &Id, cacheable: bool, buf: Bytes) -> Result<()> { self.rest.write_bytes(tpe, id, cacheable, buf).await } diff --git a/src/backend/rest.rs b/src/backend/rest.rs index 8e29d3d..37952db 100644 --- a/src/backend/rest.rs +++ b/src/backend/rest.rs @@ -3,6 +3,7 @@ use std::time::Duration; use anyhow::Result; use async_trait::async_trait; use backoff::{ExponentialBackoff, ExponentialBackoffBuilder}; +use bytes::Bytes; use reqwest::{Client, Url}; use serde::Deserialize; use vlog::*; @@ -108,7 +109,7 @@ impl ReadBackend for RestBackend { .await?) } - async fn read_full(&self, tpe: FileType, id: &Id) -> Result> { + async fn read_full(&self, tpe: FileType, id: &Id) -> Result { Ok(backoff::future::retry_notify( self.backoff.clone(), || async { @@ -134,7 +135,7 @@ impl ReadBackend for RestBackend { _cacheable: bool, offset: u32, length: u32, - ) -> Result> { + ) -> Result { let offset2 = offset + length - 1; let header_value = format!("bytes={}-{}", offset, offset2); Ok(backoff::future::retry_notify( @@ -179,7 +180,7 @@ impl WriteBackend for RestBackend { tpe: FileType, id: &Id, _cacheable: bool, - buf: Vec, + buf: Bytes, ) -> Result<()> { v3!("writing tpe: {:?}, id: {}", &tpe, &id); let req_builder = self.client.post(self.url(tpe, id)).body(buf); diff --git a/src/blob/packer.rs b/src/blob/packer.rs index 9cde696..fd0ff97 100644 --- a/src/blob/packer.rs +++ b/src/blob/packer.rs @@ -4,6 +4,7 @@ use std::time::{Duration, SystemTime}; use anyhow::{anyhow, Result}; use binrw::{io::Cursor, BinWrite}; +use bytes::{Bytes, BytesMut}; use chrono::Local; use tokio::{spawn, task::JoinHandle}; use zstd::encode_all; @@ -50,7 +51,7 @@ impl PackSizer { pub struct Packer { be: BE, blob_type: BlobType, - file: Vec, + file: BytesMut, size: u32, count: u32, created: SystemTime, @@ -81,7 +82,7 @@ impl Packer { Ok(Self { be, blob_type, - file: Vec::new(), + file: BytesMut::new(), size: 0, count: 0, created: SystemTime::now(), @@ -263,8 +264,8 @@ impl Packer { // write file to backend let index = std::mem::take(&mut self.index); - let file = std::mem::replace(&mut self.file, Vec::new()); - self.file_writer.add(index, file, id).await?; + let file = std::mem::replace(&mut self.file, BytesMut::new()); + self.file_writer.add(index, file.into(), id).await?; Ok(()) } @@ -282,7 +283,7 @@ struct FileWriter { } impl FileWriter { - async fn add(&mut self, mut index: IndexPack, file: Vec, id: Id) -> Result<()> { + async fn add(&mut self, mut index: IndexPack, file: Bytes, id: Id) -> Result<()> { let be = self.be.clone(); let indexer = self.indexer.clone(); let cacheable = self.cacheable; diff --git a/src/commands/cat.rs b/src/commands/cat.rs index e9befc4..08e4dcd 100644 --- a/src/commands/cat.rs +++ b/src/commands/cat.rs @@ -57,7 +57,7 @@ pub(super) async fn execute(be: &impl DecryptReadBackend, opts: Opts) -> Result< async fn cat_file(be: &impl DecryptReadBackend, tpe: FileType, opt: IdOpt) -> Result<()> { let id = be.find_id(tpe, &opt.id).await?; let data = be.read_encrypted_full(tpe, &id).await?; - println!("{}", String::from_utf8(data)?); + println!("{}", String::from_utf8(data.to_vec())?); Ok(()) } @@ -68,7 +68,7 @@ async fn cat_blob(be: &impl DecryptReadBackend, tpe: BlobType, opt: IdOpt) -> Re .await? .blob_from_backend(&tpe, &id) .await?; - print!("{}", String::from_utf8(data)?); + print!("{}", String::from_utf8(data.to_vec())?); Ok(()) } @@ -79,7 +79,7 @@ async fn cat_tree(be: &impl DecryptReadBackend, opts: TreeOpts) -> Result<()> { let index = IndexBackend::new(be, progress_counter()).await?; let id = Tree::subtree_id(&index, snap.tree, Path::new(path)).await?; let data = index.blob_from_backend(&BlobType::Tree, &id).await?; - println!("{}", String::from_utf8(data)?); + println!("{}", String::from_utf8(data.to_vec())?); Ok(()) } diff --git a/src/commands/init.rs b/src/commands/init.rs index ee79c78..f5bae89 100644 --- a/src/commands/init.rs +++ b/src/commands/init.rs @@ -2,6 +2,7 @@ use std::fs::File; use std::io::BufReader; use anyhow::{bail, Result}; +use bytes::Bytes; use clap::Parser; use rpassword::{prompt_password, read_password_from_bufread}; @@ -60,7 +61,7 @@ pub(super) async fn execute( key_opts.username, key_opts.with_created, )?; - let data = serde_json::to_vec(&keyfile)?; + let data: Bytes = serde_json::to_vec(&keyfile)?.into(); let id = hash(&data); be.create().await?; be.write_bytes(FileType::Key, &id, false, data.clone()) diff --git a/src/commands/key.rs b/src/commands/key.rs index a954eba..76d0cf3 100644 --- a/src/commands/key.rs +++ b/src/commands/key.rs @@ -56,7 +56,8 @@ async fn add_key(be: &impl WriteBackend, key: Key, opts: AddOpts) -> Result<()> let keyfile = KeyFile::generate(key, &pass, opts.hostname, opts.username, opts.with_created)?; let data = serde_json::to_vec(&keyfile)?; let id = hash(&data); - be.write_bytes(FileType::Key, &id, false, data).await?; + be.write_bytes(FileType::Key, &id, false, data.into()) + .await?; println!("key {} successfully added.", id); Ok(()) diff --git a/src/index/mod.rs b/src/index/mod.rs index ba822b7..b11d4de 100644 --- a/src/index/mod.rs +++ b/src/index/mod.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use ambassador::{delegatable_trait, Delegate}; use anyhow::{anyhow, Result}; use async_trait::async_trait; +use bytes::Bytes; use derive_getters::Getters; use derive_more::Constructor; use futures::StreamExt; @@ -42,7 +43,7 @@ impl IndexEntry { } /// Get a blob described by IndexEntry from the backend - pub async fn read_data(&self, be: &B) -> Result> { + pub async fn read_data(&self, be: &B) -> Result { let data = be .read_encrypted_partial( FileType::Pack, @@ -96,7 +97,7 @@ pub trait IndexedBackend: ReadIndex + Clone + Sync + Send + 'static { fn be(&self) -> &Self::Backend; - async fn blob_from_backend(&self, tpe: &BlobType, id: &Id) -> Result> { + async fn blob_from_backend(&self, tpe: &BlobType, id: &Id) -> Result { match self.get_id(tpe, id) { None => Err(anyhow!("blob not found in index")), Some(ie) => ie.read_data(self.be()).await, From e36c39774566d975d222e2f93de86acd73ab5ed5 Mon Sep 17 00:00:00 2001 From: Alexander Weiss Date: Thu, 11 Aug 2022 19:45:09 +0200 Subject: [PATCH 4/6] Add possibility to set/unset custom backend options --- src/backend/cache.rs | 4 ++++ src/backend/choose.rs | 8 ++++++++ src/backend/decrypt.rs | 4 ++++ src/backend/dry_run.rs | 4 ++++ src/backend/hotcold.rs | 4 ++++ src/backend/local.rs | 4 ++++ src/backend/mod.rs | 3 +++ src/backend/rclone.rs | 4 ++++ src/backend/rest.rs | 4 ++++ 9 files changed, 39 insertions(+) diff --git a/src/backend/cache.rs b/src/backend/cache.rs index afb4c64..8bf698f 100644 --- a/src/backend/cache.rs +++ b/src/backend/cache.rs @@ -30,6 +30,10 @@ impl ReadBackend for CachedBackend { self.be.location() } + fn set_option(&mut self, option: &str, value: &str) -> Result<()> { + self.be.set_option(option, value) + } + async fn list_with_size(&self, tpe: FileType) -> Result> { let list = self.be.list_with_size(tpe).await?; diff --git a/src/backend/choose.rs b/src/backend/choose.rs index a0297b2..d1c8a2f 100644 --- a/src/backend/choose.rs +++ b/src/backend/choose.rs @@ -36,6 +36,14 @@ impl ReadBackend for ChooseBackend { } } + fn set_option(&mut self, option: &str, value: &str) -> Result<()> { + match self { + Local(local) => local.set_option(option, value), + Rest(rest) => rest.set_option(option, value), + Rclone(rclone) => rclone.set_option(option, value), + } + } + async fn list_with_size(&self, tpe: FileType) -> Result> { match self { Local(local) => local.list_with_size(tpe).await, diff --git a/src/backend/decrypt.rs b/src/backend/decrypt.rs index e4d9b07..7ee367b 100644 --- a/src/backend/decrypt.rs +++ b/src/backend/decrypt.rs @@ -203,6 +203,10 @@ impl ReadBackend for DecryptBackend { self.backend.location() } + fn set_option(&mut self, option: &str, value: &str) -> Result<()> { + self.backend.set_option(option, value) + } + async fn list(&self, tpe: FileType) -> Result> { self.backend.list(tpe).await } diff --git a/src/backend/dry_run.rs b/src/backend/dry_run.rs index 6c557a9..d18fd22 100644 --- a/src/backend/dry_run.rs +++ b/src/backend/dry_run.rs @@ -47,6 +47,10 @@ impl ReadBackend for DryRunBackend { self.be.location() } + fn set_option(&mut self, option: &str, value: &str) -> Result<()> { + self.be.set_option(option, value) + } + async fn list_with_size(&self, tpe: FileType) -> Result> { self.be.list_with_size(tpe).await } diff --git a/src/backend/hotcold.rs b/src/backend/hotcold.rs index 0792168..0c34b48 100644 --- a/src/backend/hotcold.rs +++ b/src/backend/hotcold.rs @@ -22,6 +22,10 @@ impl ReadBackend for HotColdBackend { self.be.location() } + fn set_option(&mut self, option: &str, value: &str) -> Result<()> { + self.be.set_option(option, value) + } + async fn list_with_size(&self, tpe: FileType) -> Result> { self.be.list_with_size(tpe).await } diff --git a/src/backend/local.rs b/src/backend/local.rs index 3252942..0dce0b3 100644 --- a/src/backend/local.rs +++ b/src/backend/local.rs @@ -44,6 +44,10 @@ impl ReadBackend for LocalBackend { self.path.to_str().unwrap() } + fn set_option(&mut self, _option: &str, _value: &str) -> Result<()> { + Ok(()) + } + async fn list(&self, tpe: FileType) -> Result> { if tpe == FileType::Config { return Ok(match self.path.join("config").exists() { diff --git a/src/backend/mod.rs b/src/backend/mod.rs index fb8838f..578ca77 100644 --- a/src/backend/mod.rs +++ b/src/backend/mod.rs @@ -73,6 +73,9 @@ pub trait RepoFile: Serialize + DeserializeOwned + Sized + Send + Sync + 'static #[async_trait] pub trait ReadBackend: Clone + Send + Sync + 'static { fn location(&self) -> &str; + + fn set_option(&mut self, option: &str, value: &str) -> Result<()>; + async fn list_with_size(&self, tpe: FileType) -> Result>; async fn list(&self, tpe: FileType) -> Result> { diff --git a/src/backend/rclone.rs b/src/backend/rclone.rs index e6488cd..5240345 100644 --- a/src/backend/rclone.rs +++ b/src/backend/rclone.rs @@ -123,6 +123,10 @@ impl ReadBackend for RcloneBackend { self.rest.location() } + fn set_option(&mut self, option: &str, value: &str) -> Result<()> { + self.rest.set_option(option, value) + } + async fn list_with_size(&self, tpe: FileType) -> Result> { self.rest.list_with_size(tpe).await } diff --git a/src/backend/rest.rs b/src/backend/rest.rs index 37952db..f1da305 100644 --- a/src/backend/rest.rs +++ b/src/backend/rest.rs @@ -63,6 +63,10 @@ impl ReadBackend for RestBackend { self.url.as_str() } + fn set_option(&mut self, _option: &str, _value: &str) -> Result<()> { + Ok(()) + } + async fn list_with_size(&self, tpe: FileType) -> Result> { Ok(backoff::future::retry_notify( self.backoff.clone(), From 901718e37b728301c13267dfc7e930a870e63171 Mon Sep 17 00:00:00 2001 From: Alexander Weiss Date: Thu, 11 Aug 2022 19:47:22 +0200 Subject: [PATCH 5/6] rest backend: Add option to set/unset retry --- src/backend/rest.rs | 46 ++++++++++++++++++++++++++++++++++++++------- 1 file changed, 39 insertions(+), 7 deletions(-) diff --git a/src/backend/rest.rs b/src/backend/rest.rs index f1da305..ac6001d 100644 --- a/src/backend/rest.rs +++ b/src/backend/rest.rs @@ -1,8 +1,8 @@ use std::time::Duration; -use anyhow::Result; +use anyhow::{bail, Result}; use async_trait::async_trait; -use backoff::{ExponentialBackoff, ExponentialBackoffBuilder}; +use backoff::{backoff::Backoff, ExponentialBackoff, ExponentialBackoffBuilder}; use bytes::Bytes; use reqwest::{Client, Url}; use serde::Deserialize; @@ -10,11 +10,26 @@ use vlog::*; use super::{FileType, Id, ReadBackend, WriteBackend}; +#[derive(Clone)] +struct MaybeBackoff(Option); + +impl Backoff for MaybeBackoff { + fn next_backoff(&mut self) -> Option { + self.0.as_mut().and_then(|back| back.next_backoff()) + } + + fn reset(&mut self) { + if let Some(b) = self.0.as_mut() { + b.reset() + } + } +} + #[derive(Clone)] pub struct RestBackend { url: Url, client: Client, - backoff: ExponentialBackoff, + backoff: MaybeBackoff, } // TODO for backoff: Handle transient vs permanent errors! @@ -36,9 +51,11 @@ impl RestBackend { Self { url, client: Client::new(), - backoff: ExponentialBackoffBuilder::new() - .with_max_elapsed_time(Some(Duration::from_secs(120))) - .build(), + backoff: MaybeBackoff(Some( + ExponentialBackoffBuilder::new() + .with_max_elapsed_time(Some(Duration::from_secs(120))) + .build(), + )), } } @@ -63,7 +80,22 @@ impl ReadBackend for RestBackend { self.url.as_str() } - fn set_option(&mut self, _option: &str, _value: &str) -> Result<()> { + fn set_option(&mut self, option: &str, value: &str) -> Result<()> { + if option == "retry" { + match value { + "true" => { + self.backoff = MaybeBackoff(Some( + ExponentialBackoffBuilder::new() + .with_max_elapsed_time(Some(Duration::from_secs(120))) + .build(), + )); + } + "false" => { + self.backoff = MaybeBackoff(None); + } + val => bail!("value {val} not supported for option retry!"), + } + } Ok(()) } From cbca4159e59cb6213892a2c5646ef0bfff603084 Mon Sep 17 00:00:00 2001 From: Alexander Weiss Date: Thu, 11 Aug 2022 19:49:58 +0200 Subject: [PATCH 6/6] warmup: Set retry to false --- src/commands/restore.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/commands/restore.rs b/src/commands/restore.rs index 659872e..2079dbf 100644 --- a/src/commands/restore.rs +++ b/src/commands/restore.rs @@ -251,6 +251,8 @@ fn warm_up_command(file_infos: FileInfos, command: &str) -> Result<()> { async fn warm_up(be: &impl DecryptReadBackend, file_infos: FileInfos) -> Result<()> { let packs = file_infos.into_packs(); + let mut be = be.clone(); + be.set_option("retry", "false")?; let p = progress_counter(); p.set_length(packs.len() as u64);