backup: parallelize processing (especially for small files)

This commit is contained in:
Alexander Weiss 2023-04-22 13:37:16 +02:00
parent 88410511b1
commit e8792ac5f1
11 changed files with 196 additions and 157 deletions

36
Cargo.lock generated
View File

@ -448,6 +448,20 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "crossbeam"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2801af0d36612ae591caa9568261fddce32ce6e08a7275ea334a06a4ad021a2c"
dependencies = [
"cfg-if",
"crossbeam-channel",
"crossbeam-deque",
"crossbeam-epoch",
"crossbeam-queue",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.8"
@ -482,6 +496,16 @@ dependencies = [
"scopeguard",
]
[[package]]
name = "crossbeam-queue"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d1cfb3ea8a53f37c40dea2c7bedcbd88bdfae54f5e2175d6ecaff1c988353add"
dependencies = [
"cfg-if",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.15"
@ -1479,6 +1503,17 @@ version = "3.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1b04fb49957986fdce4d6ee7a65027d55d4b6d2265e5848bbb507b58ccfdb6f"
[[package]]
name = "pariter"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "324a62b9e7b5f270c0acc92a2040f8028bb643f959f9c068f11a7864f327e3d9"
dependencies = [
"crossbeam",
"crossbeam-channel",
"num_cpus",
]
[[package]]
name = "parking_lot"
version = "0.12.1"
@ -1923,6 +1958,7 @@ dependencies = [
"merge",
"nix",
"nom",
"pariter",
"path-dedot",
"quickcheck",
"quickcheck_macros",

View File

@ -33,6 +33,7 @@ log = "0.4"
# parallelize
crossbeam-channel = "0.5"
rayon = "1"
pariter = "0.5"
#crypto
aes256ctr_poly1305aes = "0.1"
rand = "0.8"

View File

@ -6,6 +6,7 @@ Bugs fixed:
- restore: Warm-up options given by the command line didn't work. This has been fixed.
New features:
- backup: Backing up (small) files is now much more parallelized.
- prune: Added option --repack-all
- Option --dry-run is now a global option and can also be defined in the config file or via env variable
- Updated to clap v4

View File

@ -2,7 +2,6 @@ use std::io::Read;
use anyhow::{anyhow, Result};
use indicatif::ProgressBar;
use rayon::prelude::*;
use crate::backend::{DecryptWriteBackend, ReadSourceOpen};
use crate::blob::{BlobType, Node, NodeType, Packer, PackerStats};
@ -69,28 +68,22 @@ impl<BE: DecryptWriteBackend, I: IndexedBackend> FileArchiver<BE, I> {
node: Node,
p: ProgressBar,
) -> Result<(Node, u64)> {
let mut chunks: Vec<_> =
ChunkIter::new(r, *node.meta().size() as usize, self.rabin.clone())
.enumerate() // see below
.par_bridge()
.map(|(num, chunk)| {
let chunk = chunk?;
let id = hash(&chunk);
let size = chunk.len() as u64;
let chunks: Vec<_> = ChunkIter::new(r, *node.meta().size() as usize, self.rabin.clone())
.map(|chunk| {
let chunk = chunk?;
let id = hash(&chunk);
let size = chunk.len() as u64;
if !self.index.has_data(&id) {
self.data_packer.add(&chunk, &id)?;
}
p.inc(size);
Ok((num, id, size))
})
.collect::<Result<_>>()?;
if !self.index.has_data(&id) {
self.data_packer.add(chunk.into(), id)?;
}
p.inc(size);
Ok((id, size))
})
.collect::<Result<_>>()?;
// As par_bridge doesn't guarantee to keep the order, we sort by the enumeration
chunks.sort_unstable_by_key(|x| x.0);
let filesize = chunks.iter().map(|x| x.2).sum();
let content = chunks.into_iter().map(|x| x.1).collect();
let filesize = chunks.iter().map(|x| x.1).sum();
let content = chunks.into_iter().map(|x| x.0).collect();
let mut node = node;
node.set_content(content);

View File

@ -14,6 +14,7 @@ use anyhow::Result;
use chrono::Local;
use indicatif::ProgressBar;
use log::*;
use pariter::{scope, IteratorExt};
use crate::backend::{DecryptWriteBackend, ReadSource, ReadSourceEntry};
use crate::blob::BlobType;
@ -59,13 +60,18 @@ impl<BE: DecryptWriteBackend, I: IndexedBackend> Archiver<BE, I> {
})
}
pub fn archive(
pub fn archive<R>(
mut self,
src: impl ReadSource,
src: R,
backup_path: &Path,
as_path: Option<&PathBuf>,
p: &ProgressBar,
) -> Result<SnapshotFile> {
) -> Result<SnapshotFile>
where
R: ReadSource + 'static,
<R as ReadSource>::Open: Send,
<R as ReadSource>::Iter: Send,
{
if !p.is_hidden() {
if let Some(size) = src.size()? {
p.set_length(size);
@ -104,28 +110,28 @@ impl<BE: DecryptWriteBackend, I: IndexedBackend> Archiver<BE, I> {
// handle beginning and ending of trees
let iter = TreeIterator::new(iter);
// use parent snapshot
let iter = iter.filter_map(|item| match self.parent.process(item) {
Ok(item) => Some(item),
Err(err) => {
warn!("ignoring error reading parent snapshot: {err:?}");
None
}
});
// archive files
let iter = iter.filter_map(|item| match self.file_archiver.process(item, p.clone()) {
Ok(item) => Some(item),
Err(err) => {
warn!("ignoring error: {err:?}");
None
}
});
// save items in trees
for item in iter {
self.tree_archiver.add(item)?;
}
scope(|scope| -> Result<_> {
// use parent snapshot
iter.filter_map(|item| match self.parent.process(item) {
Ok(item) => Some(item),
Err(err) => {
warn!("ignoring error reading parent snapshot: {err:?}");
None
}
})
// archive files in parallel
.parallel_map_scoped(scope, |item| self.file_archiver.process(item, p.clone()))
.readahead_scoped(scope)
.filter_map(|item| match item {
Ok(item) => Some(item),
Err(err) => {
warn!("ignoring error: {err:?}");
None
}
})
.try_for_each(|item| self.tree_archiver.add(item))
})
.unwrap()?;
let stats = self.file_archiver.finalize()?;
let (id, mut summary) = self.tree_archiver.finalize(self.parent_tree)?;

View File

@ -122,7 +122,7 @@ impl<BE: DecryptWriteBackend, I: IndexedBackend> TreeArchiver<BE, I> {
}
if !self.index.has_tree(&id) {
self.tree_packer.add(&chunk, &id)?;
self.tree_packer.add(chunk.into(), id)?;
}
Ok(id)
}

View File

@ -7,11 +7,12 @@ use anyhow::{anyhow, Result};
use bytes::{Bytes, BytesMut};
use chrono::Local;
use crossbeam_channel::{bounded, Receiver, Sender};
use pariter::{scope, IteratorExt};
use zstd::encode_all;
use super::BlobType;
use crate::backend::{DecryptFullBackend, DecryptWriteBackend, FileType};
use crate::crypto::{CryptoKey, Hasher};
use crate::crypto::{hash, CryptoKey};
use crate::id::Id;
use crate::index::SharedIndexer;
use crate::repofile::{
@ -72,10 +73,12 @@ impl PackSizer {
#[derive(Clone)]
pub struct Packer<BE: DecryptWriteBackend> {
// This is a hack: raw_packer and indexer are only used in the add_raw() method.
// TODO: Refactor as actor, like the other add() methods
raw_packer: Arc<RwLock<RawPacker<BE>>>,
key: BE::Key,
zstd: Option<i32>,
indexer: SharedIndexer<BE>,
sender: Sender<(Bytes, Id, Option<u32>)>,
finish: Receiver<Result<PackerStats>>,
}
impl<BE: DecryptWriteBackend> Packer<BE> {
@ -96,56 +99,76 @@ impl<BE: DecryptWriteBackend> Packer<BE> {
)?));
let zstd = config.zstd()?;
Ok(Self {
raw_packer,
key,
zstd,
indexer,
})
let (tx, rx) = bounded(0);
let (finish_tx, finish_rx) = bounded::<Result<PackerStats>>(0);
let packer = Self {
raw_packer: raw_packer.clone(),
indexer: indexer.clone(),
sender: tx,
finish: finish_rx,
};
std::thread::spawn(move || {
scope(|scope| {
let status = rx
.into_iter()
.readahead_scoped(scope)
.filter(|(_, id, _)| !indexer.read().unwrap().has(id))
.filter(|(_, id, _)| !raw_packer.read().unwrap().has(id))
.readahead_scoped(scope)
.parallel_map_scoped(
scope,
|(data, id, size_limit): (Bytes, Id, Option<u32>)| {
let data_len: u32 = data.len().try_into()?;
let (data, uncompressed_length) = match zstd {
None => (
key.encrypt_data(&data)
.map_err(|_| anyhow!("crypto error"))?,
None,
),
// compress if requested
Some(level) => (
key.encrypt_data(&encode_all(&*data, level)?)
.map_err(|_| anyhow!("crypto error"))?,
NonZeroU32::new(data_len),
),
};
Ok((
data,
id,
u64::from(data_len),
uncompressed_length,
size_limit,
))
},
)
.readahead_scoped(scope)
.try_for_each(|item: Result<_>| {
let (data, id, data_len, ul, size_limit) = item?;
raw_packer
.write()
.unwrap()
.add_raw(&data, &id, data_len, ul, size_limit)
})
.and_then(|_| raw_packer.write().unwrap().finalize());
let _ = finish_tx.send(status);
})
.unwrap();
});
Ok(packer)
}
/// adds the blob to the packfile
pub fn add(&self, data: &[u8], id: &Id) -> Result<()> {
pub fn add(&self, data: Bytes, id: Id) -> Result<()> {
// compute size limit based on total size and size bounds
self.add_with_sizelimit(data, id, None)
}
/// adds the blob to the packfile, allows specifying a size limit for the pack file
pub fn add_with_sizelimit(&self, data: &[u8], id: &Id, size_limit: Option<u32>) -> Result<()> {
// only add if this blob is not present
if self.indexer.read().unwrap().has(id) {
// Note: This is within two if clauses , because here the indexer lock is already released.
// using "if self.indexer.read().unwrap().has(id) || self.raw_packer.read().unwrap().has(id)"
// can lead to a deadlock as the indexer lock is hold too long (and also needed within raw_packer!)
if self.raw_packer.read().unwrap().has(id) {
return Ok(());
}
}
let key = self.key.clone();
let zstd = self.zstd;
let data_len: u32 = data.len().try_into()?;
let (data, uncompressed_length) = match zstd {
None => (
key.encrypt_data(data)
.map_err(|_| anyhow!("crypto error"))?,
None,
),
// compress if requested
Some(level) => (
key.encrypt_data(&encode_all(data, level)?)
.map_err(|_| anyhow!("crypto error"))?,
NonZeroU32::new(data_len),
),
};
self.add_raw(
&data,
id,
u64::from(data_len),
uncompressed_length,
size_limit,
)
pub fn add_with_sizelimit(&self, data: Bytes, id: Id, size_limit: Option<u32>) -> Result<()> {
self.sender.send((data, id, size_limit))?;
Ok(())
}
/// adds the already encrypted (and maybe compressed) blob to the packfile
@ -172,7 +195,10 @@ impl<BE: DecryptWriteBackend> Packer<BE> {
}
pub fn finalize(self) -> Result<PackerStats> {
self.raw_packer.write().unwrap().finalize()
// cancel channel
drop(self.sender);
// wait for items in channel to be processed
self.finish.recv().unwrap()
}
}
@ -210,8 +236,7 @@ pub struct RawPacker<BE: DecryptWriteBackend> {
count: u32,
created: SystemTime,
index: IndexPack,
hasher: Hasher,
file_writer: Option<Actor<(Bytes, Id, IndexPack)>>,
file_writer: Option<Actor>,
pack_sizer: PackSizer,
stats: PackerStats,
}
@ -242,7 +267,6 @@ impl<BE: DecryptWriteBackend> RawPacker<BE> {
count: 0,
created: SystemTime::now(),
index: IndexPack::default(),
hasher: Hasher::new(),
file_writer,
pack_sizer,
stats: PackerStats::default(),
@ -256,7 +280,6 @@ impl<BE: DecryptWriteBackend> RawPacker<BE> {
}
pub fn write_data(&mut self, data: &[u8]) -> Result<u32> {
self.hasher.update(data);
let len = data.len().try_into()?;
self.file.extend_from_slice(data);
self.size += len;
@ -272,10 +295,6 @@ impl<BE: DecryptWriteBackend> RawPacker<BE> {
uncompressed_length: Option<NonZeroU32>,
size_limit: Option<u32>,
) -> Result<()> {
if self.has(id) {
return Ok(());
}
self.stats.blobs += 1;
self.stats.data += data_len;
let data_len_packed: u64 = data.len().try_into()?;
@ -296,7 +315,6 @@ impl<BE: DecryptWriteBackend> RawPacker<BE> {
self.size = 0;
self.count = 0;
self.created = SystemTime::now();
self.hasher.reset();
}
Ok(())
}
@ -329,17 +347,13 @@ impl<BE: DecryptWriteBackend> RawPacker<BE> {
self.write_header()?;
// compute id of packfile
let id = self.hasher.finalize();
self.index.set_id(id);
// write file to backend
let index = std::mem::take(&mut self.index);
let file = std::mem::replace(&mut self.file, BytesMut::new());
self.file_writer
.as_ref()
.unwrap()
.send((file.into(), id, index))?;
.send((file.into(), index))?;
Ok(())
}
@ -356,44 +370,52 @@ struct FileWriterHandle<BE: DecryptWriteBackend> {
cacheable: bool,
}
impl<BE: DecryptWriteBackend> ActorHandle<(Bytes, Id, IndexPack)> for FileWriterHandle<BE> {
fn process(&self, load: (Bytes, Id, IndexPack)) -> Result<()> {
impl<BE: DecryptWriteBackend> FileWriterHandle<BE> {
fn process(&self, load: (Bytes, Id, IndexPack)) -> Result<IndexPack> {
let (file, id, mut index) = load;
index.set_id(id);
self.be
.write_bytes(FileType::Pack, &id, self.cacheable, file)?;
index.time = Some(Local::now());
Ok(index)
}
fn index(&self, index: IndexPack) -> Result<()> {
self.indexer.write().unwrap().add(index)?;
Ok(())
}
}
pub trait ActorHandle<T>: Clone + Send + 'static {
fn process(&self, load: T) -> Result<()>;
}
pub struct Actor<T> {
sender: Sender<T>,
pub struct Actor {
sender: Sender<(Bytes, IndexPack)>,
finish: Receiver<Result<()>>,
}
impl<T: Send + Sync + 'static> Actor<T> {
pub fn new(fwh: impl ActorHandle<T>, queue_len: usize, par: usize) -> Self {
impl Actor {
fn new<BE: DecryptWriteBackend>(
fwh: FileWriterHandle<BE>,
queue_len: usize,
_par: usize,
) -> Self {
let (tx, rx) = bounded(queue_len);
let (finish_tx, finish_rx) = bounded::<Result<()>>(0);
(0..par).for_each(|_| {
let rx = rx.clone();
let finish_tx = finish_tx.clone();
let fwh = fwh.clone();
std::thread::spawn(move || {
let mut status = Ok(());
for load in rx {
// only keep processing if there was no error
if status.is_ok() {
status = fwh.process(load);
}
}
std::thread::spawn(move || {
scope(|scope| {
let status = rx
.into_iter()
.readahead_scoped(scope)
.map(|(file, index): (Bytes, IndexPack)| {
let id = hash(&file);
(file, id, index)
})
.readahead_scoped(scope)
.map(|load| fwh.process(load))
.readahead_scoped(scope)
.try_for_each(|index| fwh.index(index?));
let _ = finish_tx.send(status);
});
})
.unwrap();
});
Self {
@ -402,7 +424,7 @@ impl<T: Send + Sync + 'static> Actor<T> {
}
}
pub fn send(&self, load: T) -> Result<()> {
pub fn send(&self, load: (Bytes, IndexPack)) -> Result<()> {
self.sender.send(load)?;
Ok(())
}
@ -466,7 +488,7 @@ impl<BE: DecryptFullBackend> Repacker<BE> {
blob.uncompressed_length,
)?;
self.packer
.add_with_sizelimit(&data, &blob.id, Some(self.size_limit))?;
.add_with_sizelimit(data, blob.id, Some(self.size_limit))?;
Ok(())
}

View File

@ -110,7 +110,7 @@ fn copy(
trace!("copy tree blob {id}");
if !index_dest.has_tree(id) {
let data = index.get_tree(id).unwrap().read_data(index.be())?;
tree_packer.add(&data, id)?;
tree_packer.add(data, *id)?;
}
Ok(())
})?;
@ -130,7 +130,7 @@ fn copy(
trace!("copy data blob {id}");
if !index_dest.has_data(id) {
let data = index.get_data(id).unwrap().read_data(index.be())?;
data_packer.add(&data, id)?;
data_packer.add(data, *id)?;
}
Ok(())
})?;
@ -141,7 +141,7 @@ fn copy(
trace!("copy tree blob {id}");
if !index_dest.has_tree(&id) {
let data = index.get_tree(&id).unwrap().read_data(index.be())?;
tree_packer.add(&data, &id)?;
tree_packer.add(data, id)?;
}
}

View File

@ -82,7 +82,7 @@ pub(super) fn execute(
let (chunk, new_id) = tree.serialize()?;
let size = u64::try_from(chunk.len())?;
if !index.has_tree(&new_id) {
packer.add(&chunk, &new_id)?;
packer.add(chunk.into(), new_id)?;
}
Ok((new_id, size))
};

View File

@ -364,7 +364,7 @@ fn repair_tree<BE: DecryptWriteBackend>(
// the tree has been changed => save it
let (chunk, new_id) = tree.serialize()?;
if !be.has_tree(&new_id) && !gopts.dry_run {
packer.add(&chunk, &new_id)?;
packer.add(chunk.into(), new_id)?;
}
if let Some(id) = id {
replaced.insert(id, (c, new_id));

View File

@ -5,23 +5,3 @@ use crate::id::Id;
pub fn hash(data: &[u8]) -> Id {
Id::new(Sha256::digest(data).into())
}
pub struct Hasher(Sha256);
impl Hasher {
pub fn new() -> Self {
Self(Sha256::new())
}
pub fn reset(&mut self) {
self.0.reset();
}
pub fn update(&mut self, data: &[u8]) {
self.0.update(data);
}
pub fn finalize(&mut self) -> Id {
Id::new(self.0.finalize_reset().into())
}
}