Merge pull request #113 from rustic-rs/rest-retry

Don't use temporary files; add retries to rest/rclone backend
This commit is contained in:
aawsome 2022-08-12 13:18:40 +02:00 committed by GitHub
commit b51297b7dc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 308 additions and 241 deletions

16
Cargo.lock generated
View File

@ -110,6 +110,20 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "backoff"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b62ddb9cb1ec0a098ad4bbf9344d0713fa193ae1a80af55febcff2627b6a00c1"
dependencies = [
"futures-core",
"getrandom 0.2.7",
"instant",
"pin-project-lite",
"rand",
"tokio",
]
[[package]]
name = "base64"
version = "0.13.0"
@ -1446,8 +1460,10 @@ dependencies = [
"ambassador",
"anyhow",
"async-trait",
"backoff",
"base64",
"binrw",
"bytes",
"bytesize",
"cachedir",
"cdc",

View File

@ -45,8 +45,8 @@ serde = { version = "1", features = ["derive"] }
serde_json = "1"
serde-aux = "3"
# other dependencies
bytes = "1"
chrono = { version = "0.4", features = ["serde"] }
tempfile = "3"
zstd = "0.11"
enum-map = "2"
enum-map-derive = "0.10"
@ -57,8 +57,10 @@ nix = "0.24"
filetime = "0.2"
# rest backend
reqwest = {version = "0.11", default-features = false, features = ["json", "rustls-tls", "stream"] }
backoff = { version = "0.4", features = ["tokio"] }
# rclone backend
sha1 = "0.10"
tempfile = "3"
# cache
dirs = "4"
cachedir = "0.3"

View File

@ -1,10 +1,11 @@
use std::collections::HashMap;
use std::fs::{self, File};
use std::io::{copy, Read, Seek, SeekFrom, Write};
use std::io::{Read, Seek, SeekFrom, Write};
use std::path::PathBuf;
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use bytes::Bytes;
use dirs::cache_dir;
use vlog::*;
use walkdir::WalkDir;
@ -29,6 +30,10 @@ impl<BE: WriteBackend> ReadBackend for CachedBackend<BE> {
self.be.location()
}
fn set_option(&mut self, option: &str, value: &str) -> Result<()> {
self.be.set_option(option, value)
}
async fn list_with_size(&self, tpe: FileType) -> Result<Vec<(Id, u32)>> {
let list = self.be.list_with_size(tpe).await?;
@ -41,7 +46,7 @@ impl<BE: WriteBackend> ReadBackend for CachedBackend<BE> {
Ok(list)
}
async fn read_full(&self, tpe: FileType, id: &Id) -> Result<Vec<u8>> {
async fn read_full(&self, tpe: FileType, id: &Id) -> Result<Bytes> {
match (&self.cache, tpe.is_cacheable()) {
(None, _) | (Some(_), false) => self.be.read_full(tpe, id).await,
(Some(cache), true) => match cache.read_full(tpe, id).await {
@ -64,7 +69,7 @@ impl<BE: WriteBackend> ReadBackend for CachedBackend<BE> {
cacheable: bool,
offset: u32,
length: u32,
) -> Result<Vec<u8>> {
) -> Result<Bytes> {
match (&self.cache, cacheable || tpe.is_cacheable()) {
(None, _) | (Some(_), false) => {
self.be
@ -97,24 +102,13 @@ impl<BE: WriteBackend> WriteBackend for CachedBackend<BE> {
self.be.create().await
}
async fn write_file(&self, tpe: FileType, id: &Id, cacheable: bool, mut f: File) -> Result<()> {
async fn write_bytes(&self, tpe: FileType, id: &Id, cacheable: bool, buf: Bytes) -> Result<()> {
if let Some(cache) = &self.cache {
if cacheable || tpe.is_cacheable() {
let f_cache = f.try_clone()?;
let _ = cache.write_file(tpe, id, cacheable, f_cache).await;
f.seek(SeekFrom::Start(0))?;
}
}
self.be.write_file(tpe, id, cacheable, f).await
}
async fn write_bytes(&self, tpe: FileType, id: &Id, buf: Vec<u8>) -> Result<()> {
if let Some(cache) = &self.cache {
if tpe.is_cacheable() {
let _ = cache.write_bytes(tpe, id, buf.clone()).await;
}
}
self.be.write_bytes(tpe, id, buf).await
self.be.write_bytes(tpe, id, cacheable, buf).await
}
async fn remove(&self, tpe: FileType, id: &Id, cacheable: bool) -> Result<()> {
@ -211,11 +205,11 @@ impl Cache {
Ok(())
}
pub async fn read_full(&self, tpe: FileType, id: &Id) -> Result<Vec<u8>> {
pub async fn read_full(&self, tpe: FileType, id: &Id) -> Result<Bytes> {
v3!("cache reading tpe: {:?}, id: {}", &tpe, &id);
let data = fs::read(self.path(tpe, id))?;
v3!("cache hit!");
Ok(data)
Ok(data.into())
}
async fn read_partial(
@ -224,7 +218,7 @@ impl Cache {
id: &Id,
offset: u32,
length: u32,
) -> Result<Vec<u8>> {
) -> Result<Bytes> {
v3!(
"cache reading tpe: {:?}, id: {}, offset: {}",
&tpe,
@ -236,28 +230,10 @@ impl Cache {
let mut vec = vec![0; length as usize];
file.read_exact(&mut vec)?;
v3!("cache hit!");
Ok(vec)
Ok(vec.into())
}
async fn write_file(
&self,
tpe: FileType,
id: &Id,
_cacheable: bool,
mut f: File,
) -> Result<()> {
v3!("cache writing tpe: {:?}, id: {}", &tpe, &id);
fs::create_dir_all(self.dir(tpe, id))?;
let filename = self.path(tpe, id);
let mut file = fs::OpenOptions::new()
.create(true)
.write(true)
.open(&filename)?;
copy(&mut f, &mut file)?;
Ok(())
}
async fn write_bytes(&self, tpe: FileType, id: &Id, buf: Vec<u8>) -> Result<()> {
async fn write_bytes(&self, tpe: FileType, id: &Id, buf: Bytes) -> Result<()> {
v3!("cache writing tpe: {:?}, id: {}", &tpe, &id);
fs::create_dir_all(self.dir(tpe, id))?;
let filename = self.path(tpe, id);

View File

@ -1,7 +1,6 @@
use std::fs::File;
use anyhow::{bail, Result};
use async_trait::async_trait;
use bytes::Bytes;
use super::{FileType, Id, ReadBackend, WriteBackend};
use super::{LocalBackend, RcloneBackend, RestBackend};
@ -37,6 +36,14 @@ impl ReadBackend for ChooseBackend {
}
}
fn set_option(&mut self, option: &str, value: &str) -> Result<()> {
match self {
Local(local) => local.set_option(option, value),
Rest(rest) => rest.set_option(option, value),
Rclone(rclone) => rclone.set_option(option, value),
}
}
async fn list_with_size(&self, tpe: FileType) -> Result<Vec<(Id, u32)>> {
match self {
Local(local) => local.list_with_size(tpe).await,
@ -45,7 +52,7 @@ impl ReadBackend for ChooseBackend {
}
}
async fn read_full(&self, tpe: FileType, id: &Id) -> Result<Vec<u8>> {
async fn read_full(&self, tpe: FileType, id: &Id) -> Result<Bytes> {
match self {
Local(local) => local.read_full(tpe, id).await,
Rest(rest) => rest.read_full(tpe, id).await,
@ -60,7 +67,7 @@ impl ReadBackend for ChooseBackend {
cacheable: bool,
offset: u32,
length: u32,
) -> Result<Vec<u8>> {
) -> Result<Bytes> {
match self {
Local(local) => local.read_partial(tpe, id, cacheable, offset, length).await,
Rest(rest) => rest.read_partial(tpe, id, cacheable, offset, length).await,
@ -83,19 +90,11 @@ impl WriteBackend for ChooseBackend {
}
}
async fn write_file(&self, tpe: FileType, id: &Id, cacheable: bool, f: File) -> Result<()> {
async fn write_bytes(&self, tpe: FileType, id: &Id, cacheable: bool, buf: Bytes) -> Result<()> {
match self {
Local(local) => local.write_file(tpe, id, cacheable, f).await,
Rest(rest) => rest.write_file(tpe, id, cacheable, f).await,
Rclone(rclone) => rclone.write_file(tpe, id, cacheable, f).await,
}
}
async fn write_bytes(&self, tpe: FileType, id: &Id, buf: Vec<u8>) -> Result<()> {
match self {
Local(local) => local.write_bytes(tpe, id, buf).await,
Rest(rest) => rest.write_bytes(tpe, id, buf).await,
Rclone(rclone) => rclone.write_bytes(tpe, id, buf).await,
Local(local) => local.write_bytes(tpe, id, cacheable, buf).await,
Rest(rest) => rest.write_bytes(tpe, id, cacheable, buf).await,
Rclone(rclone) => rclone.write_bytes(tpe, id, cacheable, buf).await,
}
}

View File

@ -1,8 +1,8 @@
use std::fs::File;
use std::num::NonZeroU32;
use anyhow::{bail, Result};
use async_trait::async_trait;
use bytes::Bytes;
use futures::{stream, stream::FuturesUnordered, StreamExt};
use indicatif::ProgressBar;
use tokio::{spawn, task::JoinHandle};
@ -16,7 +16,7 @@ impl<T: DecryptWriteBackend + DecryptReadBackend> DecryptFullBackend for T {}
#[async_trait]
pub trait DecryptReadBackend: ReadBackend {
async fn read_encrypted_full(&self, tpe: FileType, id: &Id) -> Result<Vec<u8>>;
async fn read_encrypted_full(&self, tpe: FileType, id: &Id) -> Result<Bytes>;
async fn read_encrypted_partial(
&self,
tpe: FileType,
@ -25,7 +25,7 @@ pub trait DecryptReadBackend: ReadBackend {
offset: u32,
length: u32,
uncompressed_length: Option<NonZeroU32>,
) -> Result<Vec<u8>>;
) -> Result<Bytes>;
async fn get_file<F: RepoFile>(&self, id: &Id) -> Result<F> {
let data = self.read_encrypted_full(F::TYPE, id).await?;
@ -149,7 +149,7 @@ impl<R: WriteBackend, C: CryptoKey> DecryptWriteBackend for DecryptBackend<R, C>
None => self.key().encrypt_data(data)?,
};
let id = hash(&data);
self.write_bytes(tpe, &id, data).await?;
self.write_bytes(tpe, &id, false, data.into()).await?;
Ok(id)
}
@ -160,7 +160,7 @@ impl<R: WriteBackend, C: CryptoKey> DecryptWriteBackend for DecryptBackend<R, C>
#[async_trait]
impl<R: ReadBackend, C: CryptoKey> DecryptReadBackend for DecryptBackend<R, C> {
async fn read_encrypted_full(&self, tpe: FileType, id: &Id) -> Result<Vec<u8>> {
async fn read_encrypted_full(&self, tpe: FileType, id: &Id) -> Result<Bytes> {
let decrypted = self
.key
.decrypt_data(&self.backend.read_full(tpe, id).await?)?;
@ -168,7 +168,8 @@ impl<R: ReadBackend, C: CryptoKey> DecryptReadBackend for DecryptBackend<R, C> {
b'{' | b'[' => decrypted, // not compressed
2 => decode_all(&decrypted[1..])?, // 2 indicates compressed data following
_ => bail!("not supported"),
})
}
.into())
}
async fn read_encrypted_partial(
@ -179,7 +180,7 @@ impl<R: ReadBackend, C: CryptoKey> DecryptReadBackend for DecryptBackend<R, C> {
offset: u32,
length: u32,
uncompressed_length: Option<NonZeroU32>,
) -> Result<Vec<u8>> {
) -> Result<Bytes> {
let mut data = self.key.decrypt_data(
&self
.backend
@ -192,7 +193,7 @@ impl<R: ReadBackend, C: CryptoKey> DecryptReadBackend for DecryptBackend<R, C> {
bail!("length of uncompressed data does not match!");
}
}
Ok(data)
Ok(data.into())
}
}
@ -202,6 +203,10 @@ impl<R: ReadBackend, C: CryptoKey> ReadBackend for DecryptBackend<R, C> {
self.backend.location()
}
fn set_option(&mut self, option: &str, value: &str) -> Result<()> {
self.backend.set_option(option, value)
}
async fn list(&self, tpe: FileType) -> Result<Vec<Id>> {
self.backend.list(tpe).await
}
@ -210,7 +215,7 @@ impl<R: ReadBackend, C: CryptoKey> ReadBackend for DecryptBackend<R, C> {
self.backend.list_with_size(tpe).await
}
async fn read_full(&self, tpe: FileType, id: &Id) -> Result<Vec<u8>> {
async fn read_full(&self, tpe: FileType, id: &Id) -> Result<Bytes> {
self.backend.read_full(tpe, id).await
}
@ -221,7 +226,7 @@ impl<R: ReadBackend, C: CryptoKey> ReadBackend for DecryptBackend<R, C> {
cacheable: bool,
offset: u32,
length: u32,
) -> Result<Vec<u8>> {
) -> Result<Bytes> {
self.backend
.read_partial(tpe, id, cacheable, offset, length)
.await
@ -234,12 +239,8 @@ impl<R: WriteBackend, C: CryptoKey> WriteBackend for DecryptBackend<R, C> {
self.backend.create().await
}
async fn write_file(&self, tpe: FileType, id: &Id, cacheable: bool, f: File) -> Result<()> {
self.backend.write_file(tpe, id, cacheable, f).await
}
async fn write_bytes(&self, tpe: FileType, id: &Id, buf: Vec<u8>) -> Result<()> {
self.backend.write_bytes(tpe, id, buf).await
async fn write_bytes(&self, tpe: FileType, id: &Id, cacheable: bool, buf: Bytes) -> Result<()> {
self.backend.write_bytes(tpe, id, cacheable, buf).await
}
async fn remove(&self, tpe: FileType, id: &Id, cacheable: bool) -> Result<()> {

View File

@ -1,8 +1,8 @@
use std::fs::File;
use std::num::NonZeroU32;
use anyhow::Result;
use async_trait::async_trait;
use bytes::Bytes;
use super::{
DecryptFullBackend, DecryptReadBackend, DecryptWriteBackend, FileType, Id, ReadBackend,
@ -23,7 +23,7 @@ impl<BE: DecryptFullBackend> DryRunBackend<BE> {
#[async_trait]
impl<BE: DecryptFullBackend> DecryptReadBackend for DryRunBackend<BE> {
async fn read_encrypted_full(&self, tpe: FileType, id: &Id) -> Result<Vec<u8>> {
async fn read_encrypted_full(&self, tpe: FileType, id: &Id) -> Result<Bytes> {
self.be.read_encrypted_full(tpe, id).await
}
async fn read_encrypted_partial(
@ -34,7 +34,7 @@ impl<BE: DecryptFullBackend> DecryptReadBackend for DryRunBackend<BE> {
offset: u32,
length: u32,
uncompressed_length: Option<NonZeroU32>,
) -> Result<Vec<u8>> {
) -> Result<Bytes> {
self.be
.read_encrypted_partial(tpe, id, cacheable, offset, length, uncompressed_length)
.await
@ -47,11 +47,15 @@ impl<BE: DecryptFullBackend> ReadBackend for DryRunBackend<BE> {
self.be.location()
}
fn set_option(&mut self, option: &str, value: &str) -> Result<()> {
self.be.set_option(option, value)
}
async fn list_with_size(&self, tpe: FileType) -> Result<Vec<(Id, u32)>> {
self.be.list_with_size(tpe).await
}
async fn read_full(&self, tpe: FileType, id: &Id) -> Result<Vec<u8>> {
async fn read_full(&self, tpe: FileType, id: &Id) -> Result<Bytes> {
self.be.read_full(tpe, id).await
}
@ -62,7 +66,7 @@ impl<BE: DecryptFullBackend> ReadBackend for DryRunBackend<BE> {
cacheable: bool,
offset: u32,
length: u32,
) -> Result<Vec<u8>> {
) -> Result<Bytes> {
self.be
.read_partial(tpe, id, cacheable, offset, length)
.await
@ -101,17 +105,10 @@ impl<BE: DecryptFullBackend> WriteBackend for DryRunBackend<BE> {
}
}
async fn write_file(&self, tpe: FileType, id: &Id, cacheable: bool, f: File) -> Result<()> {
async fn write_bytes(&self, tpe: FileType, id: &Id, cacheable: bool, buf: Bytes) -> Result<()> {
match self.dry_run {
true => Ok(()),
false => self.be.write_file(tpe, id, cacheable, f).await,
}
}
async fn write_bytes(&self, tpe: FileType, id: &Id, buf: Vec<u8>) -> Result<()> {
match self.dry_run {
true => Ok(()),
false => self.be.write_bytes(tpe, id, buf).await,
false => self.be.write_bytes(tpe, id, cacheable, buf).await,
}
}

View File

@ -1,8 +1,6 @@
use std::fs::File;
use std::io::{Seek, SeekFrom};
use anyhow::Result;
use async_trait::async_trait;
use bytes::Bytes;
use super::{FileType, Id, ReadBackend, WriteBackend};
@ -24,11 +22,15 @@ impl<BE: WriteBackend> ReadBackend for HotColdBackend<BE> {
self.be.location()
}
fn set_option(&mut self, option: &str, value: &str) -> Result<()> {
self.be.set_option(option, value)
}
async fn list_with_size(&self, tpe: FileType) -> Result<Vec<(Id, u32)>> {
self.be.list_with_size(tpe).await
}
async fn read_full(&self, tpe: FileType, id: &Id) -> Result<Vec<u8>> {
async fn read_full(&self, tpe: FileType, id: &Id) -> Result<Bytes> {
match &self.hot_be {
None => self.be.read_full(tpe, id).await,
Some(be) => be.read_full(tpe, id).await,
@ -42,7 +44,7 @@ impl<BE: WriteBackend> ReadBackend for HotColdBackend<BE> {
cacheable: bool,
offset: u32,
length: u32,
) -> Result<Vec<u8>> {
) -> Result<Bytes> {
match (&self.hot_be, cacheable || tpe != FileType::Pack) {
(None, _) | (Some(_), false) => {
self.be
@ -60,24 +62,13 @@ impl<BE: WriteBackend> WriteBackend for HotColdBackend<BE> {
self.be.create().await
}
async fn write_file(&self, tpe: FileType, id: &Id, cacheable: bool, mut f: File) -> Result<()> {
async fn write_bytes(&self, tpe: FileType, id: &Id, cacheable: bool, buf: Bytes) -> Result<()> {
if let Some(be) = &self.hot_be {
if cacheable || tpe != FileType::Pack {
let f_hot = f.try_clone()?;
be.write_file(tpe, id, cacheable, f_hot).await?;
f.seek(SeekFrom::Start(0))?;
if tpe != FileType::Config && (cacheable || tpe != FileType::Pack) {
be.write_bytes(tpe, id, cacheable, buf.clone()).await?;
}
}
self.be.write_file(tpe, id, cacheable, f).await
}
async fn write_bytes(&self, tpe: FileType, id: &Id, buf: Vec<u8>) -> Result<()> {
if let Some(be) = &self.hot_be {
if tpe != FileType::Config {
be.write_bytes(tpe, id, buf.clone()).await?;
}
}
self.be.write_bytes(tpe, id, buf).await
self.be.write_bytes(tpe, id, cacheable, buf).await
}
async fn remove(&self, tpe: FileType, id: &Id, cacheable: bool) -> Result<()> {

View File

@ -1,10 +1,11 @@
use std::fs::{self, File};
use std::io::{copy, Read, Seek, SeekFrom, Write};
use std::io::{Read, Seek, SeekFrom, Write};
use std::os::unix::fs::{symlink, FileExt, PermissionsExt};
use std::path::{Path, PathBuf};
use anyhow::Result;
use async_trait::async_trait;
use bytes::Bytes;
use filetime::{set_file_atime, set_file_mtime, FileTime};
use nix::sys::stat::{mknod, Mode, SFlag};
use nix::unistd::chown;
@ -43,6 +44,10 @@ impl ReadBackend for LocalBackend {
self.path.to_str().unwrap()
}
fn set_option(&mut self, _option: &str, _value: &str) -> Result<()> {
Ok(())
}
async fn list(&self, tpe: FileType) -> Result<Vec<Id>> {
if tpe == FileType::Config {
return Ok(match self.path.join("config").exists() {
@ -99,8 +104,8 @@ impl ReadBackend for LocalBackend {
Ok(walker.collect())
}
async fn read_full(&self, tpe: FileType, id: &Id) -> Result<Vec<u8>> {
Ok(fs::read(self.path(tpe, id))?)
async fn read_full(&self, tpe: FileType, id: &Id) -> Result<Bytes> {
Ok(fs::read(self.path(tpe, id))?.into())
}
async fn read_partial(
@ -110,12 +115,12 @@ impl ReadBackend for LocalBackend {
_cacheable: bool,
offset: u32,
length: u32,
) -> Result<Vec<u8>> {
) -> Result<Bytes> {
let mut file = File::open(self.path(tpe, id))?;
file.seek(SeekFrom::Start(offset.try_into().unwrap()))?;
let mut vec = vec![0; length.try_into().unwrap()];
file.read_exact(&mut vec)?;
Ok(vec)
Ok(vec.into())
}
}
@ -131,26 +136,13 @@ impl WriteBackend for LocalBackend {
Ok(())
}
async fn write_file(
async fn write_bytes(
&self,
tpe: FileType,
id: &Id,
_cacheable: bool,
mut f: File,
buf: Bytes,
) -> Result<()> {
v3!("writing tpe: {:?}, id: {}", &tpe, &id);
let filename = self.path(tpe, id);
let mut file = fs::OpenOptions::new()
.create(true)
.write(true)
.open(&filename)?;
file.set_len(0)?;
copy(&mut f, &mut file)?;
file.sync_all()?;
Ok(())
}
async fn write_bytes(&self, tpe: FileType, id: &Id, buf: Vec<u8>) -> Result<()> {
v3!("writing tpe: {:?}, id: {}", &tpe, &id);
let filename = self.path(tpe, id);
let mut file = fs::OpenOptions::new()
@ -278,13 +270,13 @@ impl LocalBackend {
Ok(())
}
pub fn read_at(&self, item: impl AsRef<Path>, offset: u64, length: u64) -> Result<Vec<u8>> {
pub fn read_at(&self, item: impl AsRef<Path>, offset: u64, length: u64) -> Result<Bytes> {
let filename = self.path.join(item);
let mut file = File::open(&filename)?;
file.seek(SeekFrom::Start(offset))?;
let mut vec = vec![0; length.try_into().unwrap()];
file.read_exact(&mut vec).unwrap();
Ok(vec)
Ok(vec.into())
}
pub fn get_matching_file(&self, item: impl AsRef<Path>, size: u64) -> Option<File> {

View File

@ -1,9 +1,9 @@
use std::fs::File;
use std::io::Read;
use std::path::{Path, PathBuf};
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use bytes::Bytes;
use serde::{de::DeserializeOwned, Serialize};
use crate::id::Id;
@ -73,6 +73,9 @@ pub trait RepoFile: Serialize + DeserializeOwned + Sized + Send + Sync + 'static
#[async_trait]
pub trait ReadBackend: Clone + Send + Sync + 'static {
fn location(&self) -> &str;
fn set_option(&mut self, option: &str, value: &str) -> Result<()>;
async fn list_with_size(&self, tpe: FileType) -> Result<Vec<(Id, u32)>>;
async fn list(&self, tpe: FileType) -> Result<Vec<Id>> {
@ -84,7 +87,7 @@ pub trait ReadBackend: Clone + Send + Sync + 'static {
.collect())
}
async fn read_full(&self, tpe: FileType, id: &Id) -> Result<Vec<u8>>;
async fn read_full(&self, tpe: FileType, id: &Id) -> Result<Bytes>;
async fn read_partial(
&self,
tpe: FileType,
@ -92,7 +95,7 @@ pub trait ReadBackend: Clone + Send + Sync + 'static {
cacheable: bool,
offset: u32,
length: u32,
) -> Result<Vec<u8>>;
) -> Result<Bytes>;
async fn find_starts_with(&self, tpe: FileType, vec: &[String]) -> Result<Vec<Result<Id>>> {
#[derive(Clone, Copy, PartialEq, Eq)]
@ -147,8 +150,7 @@ pub trait ReadBackend: Clone + Send + Sync + 'static {
#[async_trait]
pub trait WriteBackend: ReadBackend {
async fn create(&self) -> Result<()>;
async fn write_file(&self, tpe: FileType, id: &Id, cacheable: bool, f: File) -> Result<()>;
async fn write_bytes(&self, tpe: FileType, id: &Id, buf: Vec<u8>) -> Result<()>;
async fn write_bytes(&self, tpe: FileType, id: &Id, cacheable: bool, buf: Bytes) -> Result<()>;
async fn remove(&self, tpe: FileType, id: &Id, cacheable: bool) -> Result<()>;
}
@ -161,5 +163,5 @@ pub trait ReadSource: Iterator<Item = Result<(PathBuf, Node)>> {
pub trait WriteSource: Clone {
fn create(&self, path: PathBuf, node: Node);
fn set_metadata(&self, path: PathBuf, node: Node);
fn write_at(&self, path: PathBuf, offset: u64, data: Vec<u8>);
fn write_at(&self, path: PathBuf, offset: u64, data: Bytes);
}

View File

@ -6,6 +6,7 @@ use std::sync::Arc;
use anyhow::{anyhow, bail, Result};
use async_trait::async_trait;
use bytes::Bytes;
use rand::distributions::{Alphanumeric, DistString};
use rand::thread_rng;
use sha1::{Digest, Sha1};
@ -122,11 +123,15 @@ impl ReadBackend for RcloneBackend {
self.rest.location()
}
fn set_option(&mut self, option: &str, value: &str) -> Result<()> {
self.rest.set_option(option, value)
}
async fn list_with_size(&self, tpe: FileType) -> Result<Vec<(Id, u32)>> {
self.rest.list_with_size(tpe).await
}
async fn read_full(&self, tpe: FileType, id: &Id) -> Result<Vec<u8>> {
async fn read_full(&self, tpe: FileType, id: &Id) -> Result<Bytes> {
self.rest.read_full(tpe, id).await
}
@ -137,7 +142,7 @@ impl ReadBackend for RcloneBackend {
cacheable: bool,
offset: u32,
length: u32,
) -> Result<Vec<u8>> {
) -> Result<Bytes> {
self.rest
.read_partial(tpe, id, cacheable, offset, length)
.await
@ -150,12 +155,8 @@ impl WriteBackend for RcloneBackend {
self.rest.create().await
}
async fn write_file(&self, tpe: FileType, id: &Id, cacheable: bool, f: File) -> Result<()> {
self.rest.write_file(tpe, id, cacheable, f).await
}
async fn write_bytes(&self, tpe: FileType, id: &Id, buf: Vec<u8>) -> Result<()> {
self.rest.write_bytes(tpe, id, buf).await
async fn write_bytes(&self, tpe: FileType, id: &Id, cacheable: bool, buf: Bytes) -> Result<()> {
self.rest.write_bytes(tpe, id, cacheable, buf).await
}
async fn remove(&self, tpe: FileType, id: &Id, cacheable: bool) -> Result<()> {

View File

@ -1,17 +1,40 @@
use std::fs::File;
use std::time::Duration;
use anyhow::Result;
use anyhow::{bail, Result};
use async_trait::async_trait;
use backoff::{backoff::Backoff, ExponentialBackoff, ExponentialBackoffBuilder};
use bytes::Bytes;
use reqwest::{Client, Url};
use serde::Deserialize;
use vlog::*;
use super::{FileType, Id, ReadBackend, WriteBackend};
#[derive(Clone)]
struct MaybeBackoff(Option<ExponentialBackoff>);
impl Backoff for MaybeBackoff {
fn next_backoff(&mut self) -> Option<Duration> {
self.0.as_mut().and_then(|back| back.next_backoff())
}
fn reset(&mut self) {
if let Some(b) = self.0.as_mut() {
b.reset()
}
}
}
#[derive(Clone)]
pub struct RestBackend {
url: Url,
client: Client,
backoff: MaybeBackoff,
}
// TODO for backoff: Handle transient vs permanent errors!
fn notify(err: reqwest::Error, duration: Duration) {
println!("Error {err} at {duration:?}, retrying");
}
impl RestBackend {
@ -28,6 +51,11 @@ impl RestBackend {
Self {
url,
client: Client::new(),
backoff: MaybeBackoff(Some(
ExponentialBackoffBuilder::new()
.with_max_elapsed_time(Some(Duration::from_secs(120)))
.build(),
)),
}
}
@ -52,55 +80,88 @@ impl ReadBackend for RestBackend {
self.url.as_str()
}
async fn list_with_size(&self, tpe: FileType) -> Result<Vec<(Id, u32)>> {
if tpe == FileType::Config {
return Ok(
match self
.client
.head(self.url.join("config").unwrap())
.send()
.await?
.status()
.is_success()
{
true => vec![(Id::default(), 0)],
false => Vec::new(),
},
);
fn set_option(&mut self, option: &str, value: &str) -> Result<()> {
if option == "retry" {
match value {
"true" => {
self.backoff = MaybeBackoff(Some(
ExponentialBackoffBuilder::new()
.with_max_elapsed_time(Some(Duration::from_secs(120)))
.build(),
));
}
"false" => {
self.backoff = MaybeBackoff(None);
}
val => bail!("value {val} not supported for option retry!"),
}
}
let mut path = tpe.name().to_string();
path.push('/');
let url = self.url.join(&path).unwrap();
// format which is delivered by the REST-service
#[derive(Deserialize)]
struct ListEntry {
name: Id,
size: u32,
}
let list = self
.client
.get(url)
.header("Accept", "application/vnd.x.restic.rest.v2")
.send()
.await?
.json::<Vec<ListEntry>>()
.await?;
Ok(list.into_iter().map(|i| (i.name, i.size)).collect())
Ok(())
}
async fn read_full(&self, tpe: FileType, id: &Id) -> Result<Vec<u8>> {
Ok(self
.client
.get(self.url(tpe, id))
.send()
.await?
.bytes()
.await?
.into_iter()
.collect())
async fn list_with_size(&self, tpe: FileType) -> Result<Vec<(Id, u32)>> {
Ok(backoff::future::retry_notify(
self.backoff.clone(),
|| async {
if tpe == FileType::Config {
return Ok(
match self
.client
.head(self.url.join("config").unwrap())
.send()
.await?
.status()
.is_success()
{
true => vec![(Id::default(), 0)],
false => Vec::new(),
},
);
}
let mut path = tpe.name().to_string();
path.push('/');
let url = self.url.join(&path).unwrap();
// format which is delivered by the REST-service
#[derive(Deserialize)]
struct ListEntry {
name: Id,
size: u32,
}
let list = self
.client
.get(url)
.header("Accept", "application/vnd.x.restic.rest.v2")
.send()
.await?
.json::<Vec<ListEntry>>()
.await?;
Ok(list.into_iter().map(|i| (i.name, i.size)).collect())
},
notify,
)
.await?)
}
async fn read_full(&self, tpe: FileType, id: &Id) -> Result<Bytes> {
Ok(backoff::future::retry_notify(
self.backoff.clone(),
|| async {
Ok(self
.client
.get(self.url(tpe, id))
.send()
.await?
.bytes()
.await?
.into_iter()
.collect())
},
notify,
)
.await?)
}
async fn read_partial(
@ -110,51 +171,76 @@ impl ReadBackend for RestBackend {
_cacheable: bool,
offset: u32,
length: u32,
) -> Result<Vec<u8>> {
) -> Result<Bytes> {
let offset2 = offset + length - 1;
let header_value = format!("bytes={}-{}", offset, offset2);
Ok(self
.client
.get(self.url(tpe, id))
.header("Range", header_value)
.send()
.await?
.bytes()
.await?
.into_iter()
.collect())
Ok(backoff::future::retry_notify(
self.backoff.clone(),
|| async {
Ok(self
.client
.get(self.url(tpe, id))
.header("Range", header_value.clone())
.send()
.await?
.bytes()
.await?
.into_iter()
.collect())
},
notify,
)
.await?)
}
}
#[async_trait]
impl WriteBackend for RestBackend {
async fn create(&self) -> Result<()> {
self.client
.post(self.url.join("?create=true").unwrap())
.send()
.await?;
Ok(())
Ok(backoff::future::retry_notify(
self.backoff.clone(),
|| async {
self.client
.post(self.url.join("?create=true").unwrap())
.send()
.await?;
Ok(())
},
notify,
)
.await?)
}
async fn write_file(&self, tpe: FileType, id: &Id, _cacheable: bool, f: File) -> Result<()> {
async fn write_bytes(
&self,
tpe: FileType,
id: &Id,
_cacheable: bool,
buf: Bytes,
) -> Result<()> {
v3!("writing tpe: {:?}, id: {}", &tpe, &id);
self.client
.post(self.url(tpe, id))
.body(tokio::fs::File::from_std(f))
.send()
.await?;
Ok(())
}
async fn write_bytes(&self, tpe: FileType, id: &Id, buf: Vec<u8>) -> Result<()> {
v3!("writing tpe: {:?}, id: {}", &tpe, &id);
self.client.post(self.url(tpe, id)).body(buf).send().await?;
Ok(())
let req_builder = self.client.post(self.url(tpe, id)).body(buf);
Ok(backoff::future::retry_notify(
self.backoff.clone(),
|| async {
req_builder.try_clone().unwrap().send().await?;
Ok(())
},
notify,
)
.await?)
}
async fn remove(&self, tpe: FileType, id: &Id, _cacheable: bool) -> Result<()> {
v3!("removing tpe: {:?}, id: {}", &tpe, &id);
self.client.delete(self.url(tpe, id)).send().await?;
Ok(())
Ok(backoff::future::retry_notify(
self.backoff.clone(),
|| async {
self.client.delete(self.url(tpe, id)).send().await?;
Ok(())
},
notify,
)
.await?)
}
}

View File

@ -1,13 +1,11 @@
use integer_sqrt::IntegerSquareRoot;
use std::fs::File;
use std::io::{Seek, SeekFrom, Write};
use std::num::NonZeroU32;
use std::time::{Duration, SystemTime};
use anyhow::{anyhow, Result};
use binrw::{io::Cursor, BinWrite};
use bytes::{Bytes, BytesMut};
use chrono::Local;
use tempfile::tempfile;
use tokio::{spawn, task::JoinHandle};
use zstd::encode_all;
@ -53,7 +51,7 @@ impl PackSizer {
pub struct Packer<BE: DecryptWriteBackend> {
be: BE,
blob_type: BlobType,
file: File,
file: BytesMut,
size: u32,
count: u32,
created: SystemTime,
@ -84,7 +82,7 @@ impl<BE: DecryptWriteBackend> Packer<BE> {
Ok(Self {
be,
blob_type,
file: tempfile()?,
file: BytesMut::new(),
size: 0,
count: 0,
created: SystemTime::now(),
@ -104,7 +102,8 @@ impl<BE: DecryptWriteBackend> Packer<BE> {
pub async fn write_data(&mut self, data: &[u8]) -> Result<u32> {
self.hasher.update(data);
let len = self.file.write(data)?.try_into()?;
let len = data.len().try_into()?;
self.file.extend_from_slice(data);
self.size += len;
Ok(len)
}
@ -265,8 +264,8 @@ impl<BE: DecryptWriteBackend> Packer<BE> {
// write file to backend
let index = std::mem::take(&mut self.index);
let file = std::mem::replace(&mut self.file, tempfile()?);
self.file_writer.add(index, file, id).await?;
let file = std::mem::replace(&mut self.file, BytesMut::new());
self.file_writer.add(index, file.into(), id).await?;
Ok(())
}
@ -284,13 +283,12 @@ struct FileWriter<BE: DecryptWriteBackend> {
}
impl<BE: DecryptWriteBackend> FileWriter<BE> {
async fn add(&mut self, mut index: IndexPack, mut file: File, id: Id) -> Result<()> {
async fn add(&mut self, mut index: IndexPack, file: Bytes, id: Id) -> Result<()> {
let be = self.be.clone();
let indexer = self.indexer.clone();
let cacheable = self.cacheable;
let new_future = spawn(async move {
file.seek(SeekFrom::Start(0))?;
be.write_file(FileType::Pack, &id, cacheable, file).await?;
be.write_bytes(FileType::Pack, &id, cacheable, file).await?;
index.time = Some(Local::now());
indexer.write().await.add(index).await?;
Ok(())

View File

@ -57,7 +57,7 @@ pub(super) async fn execute(be: &impl DecryptReadBackend, opts: Opts) -> Result<
async fn cat_file(be: &impl DecryptReadBackend, tpe: FileType, opt: IdOpt) -> Result<()> {
let id = be.find_id(tpe, &opt.id).await?;
let data = be.read_encrypted_full(tpe, &id).await?;
println!("{}", String::from_utf8(data)?);
println!("{}", String::from_utf8(data.to_vec())?);
Ok(())
}
@ -68,7 +68,7 @@ async fn cat_blob(be: &impl DecryptReadBackend, tpe: BlobType, opt: IdOpt) -> Re
.await?
.blob_from_backend(&tpe, &id)
.await?;
print!("{}", String::from_utf8(data)?);
print!("{}", String::from_utf8(data.to_vec())?);
Ok(())
}
@ -79,7 +79,7 @@ async fn cat_tree(be: &impl DecryptReadBackend, opts: TreeOpts) -> Result<()> {
let index = IndexBackend::new(be, progress_counter()).await?;
let id = Tree::subtree_id(&index, snap.tree, Path::new(path)).await?;
let data = index.blob_from_backend(&BlobType::Tree, &id).await?;
println!("{}", String::from_utf8(data)?);
println!("{}", String::from_utf8(data.to_vec())?);
Ok(())
}

View File

@ -2,6 +2,7 @@ use std::fs::File;
use std::io::BufReader;
use anyhow::{bail, Result};
use bytes::Bytes;
use clap::Parser;
use rpassword::{prompt_password, read_password_from_bufread};
@ -60,14 +61,15 @@ pub(super) async fn execute(
key_opts.username,
key_opts.with_created,
)?;
let data = serde_json::to_vec(&keyfile)?;
let data: Bytes = serde_json::to_vec(&keyfile)?.into();
let id = hash(&data);
be.create().await?;
be.write_bytes(FileType::Key, &id, data.clone()).await?;
be.write_bytes(FileType::Key, &id, false, data.clone())
.await?;
if let Some(hot_be) = hot_be {
hot_be.create().await?;
hot_be.write_bytes(FileType::Key, &id, data).await?;
hot_be.write_bytes(FileType::Key, &id, false, data).await?;
}
println!("key {} successfully added.", id);

View File

@ -56,7 +56,8 @@ async fn add_key(be: &impl WriteBackend, key: Key, opts: AddOpts) -> Result<()>
let keyfile = KeyFile::generate(key, &pass, opts.hostname, opts.username, opts.with_created)?;
let data = serde_json::to_vec(&keyfile)?;
let id = hash(&data);
be.write_bytes(FileType::Key, &id, data).await?;
be.write_bytes(FileType::Key, &id, false, data.into())
.await?;
println!("key {} successfully added.", id);
Ok(())

View File

@ -251,6 +251,8 @@ fn warm_up_command(file_infos: FileInfos, command: &str) -> Result<()> {
async fn warm_up(be: &impl DecryptReadBackend, file_infos: FileInfos) -> Result<()> {
let packs = file_infos.into_packs();
let mut be = be.clone();
be.set_option("retry", "false")?;
let p = progress_counter();
p.set_length(packs.len() as u64);

View File

@ -4,6 +4,7 @@ use std::sync::Arc;
use ambassador::{delegatable_trait, Delegate};
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use bytes::Bytes;
use derive_getters::Getters;
use derive_more::Constructor;
use futures::StreamExt;
@ -42,7 +43,7 @@ impl IndexEntry {
}
/// Get a blob described by IndexEntry from the backend
pub async fn read_data<B: DecryptReadBackend>(&self, be: &B) -> Result<Vec<u8>> {
pub async fn read_data<B: DecryptReadBackend>(&self, be: &B) -> Result<Bytes> {
let data = be
.read_encrypted_partial(
FileType::Pack,
@ -96,7 +97,7 @@ pub trait IndexedBackend: ReadIndex + Clone + Sync + Send + 'static {
fn be(&self) -> &Self::Backend;
async fn blob_from_backend(&self, tpe: &BlobType, id: &Id) -> Result<Vec<u8>> {
async fn blob_from_backend(&self, tpe: &BlobType, id: &Id) -> Result<Bytes> {
match self.get_id(tpe, id) {
None => Err(anyhow!("blob not found in index")),
Some(ie) => ie.read_data(self.be()).await,