backup: Use rayon to parallelize hashing

This commit is contained in:
Alexander Weiss 2022-12-19 00:49:33 +01:00
parent f049b01864
commit fac991d6ac
4 changed files with 19 additions and 62 deletions

36
Cargo.lock generated
View File

@ -362,20 +362,6 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "crossbeam"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2801af0d36612ae591caa9568261fddce32ce6e08a7275ea334a06a4ad021a2c"
dependencies = [
"cfg-if",
"crossbeam-channel",
"crossbeam-deque",
"crossbeam-epoch",
"crossbeam-queue",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.6"
@ -410,16 +396,6 @@ dependencies = [
"scopeguard",
]
[[package]]
name = "crossbeam-queue"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d1cfb3ea8a53f37c40dea2c7bedcbd88bdfae54f5e2175d6ecaff1c988353add"
dependencies = [
"cfg-if",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.14"
@ -1330,17 +1306,6 @@ version = "3.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1b04fb49957986fdce4d6ee7a65027d55d4b6d2265e5848bbb507b58ccfdb6f"
[[package]]
name = "pariter"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "324a62b9e7b5f270c0acc92a2040f8028bb643f959f9c068f11a7864f327e3d9"
dependencies = [
"crossbeam",
"crossbeam-channel",
"num_cpus",
]
[[package]]
name = "parking_lot"
version = "0.12.1"
@ -1737,7 +1702,6 @@ dependencies = [
"log",
"merge",
"nix",
"pariter",
"path-dedot",
"quickcheck",
"quickcheck_macros",

View File

@ -33,7 +33,6 @@ log = "0.4"
# parallelize
crossbeam-channel = "0.5"
rayon = "1"
pariter = "0.5"
#crypto
aes256ctr_poly1305aes = "0.1"
sha2 = "0.10"

View File

@ -7,7 +7,7 @@ use bytesize::ByteSize;
use chrono::Local;
use indicatif::ProgressBar;
use log::*;
use pariter::IteratorExt;
use rayon::prelude::*;
use crate::backend::DecryptWriteBackend;
use crate::blob::{BlobType, Metadata, Node, NodeType, Packer, Tree};
@ -227,37 +227,31 @@ impl<BE: DecryptWriteBackend, I: IndexedBackend> Archiver<BE, I> {
pub fn backup_reader(
&mut self,
r: impl Read + 'static,
r: impl Read + Send + 'static,
node: Node,
p: ProgressBar,
) -> Result<()> {
let chunk_iter = ChunkIter::new(r, *node.meta().size() as usize, &self.poly);
let mut content = Vec::new();
let mut filesize: u64 = 0;
let index = self.index.clone();
let data_packer = self.data_packer.clone();
chunk_iter
.into_iter()
// TODO: This parallelization works pretty well for big files. For small files this produces a lot of
// unneccessary overhead. Maybe use a parallel hashing actor?
.parallel_map(move |chunk| {
let mut chunks: Vec<_> = ChunkIter::new(r, *node.meta().size() as usize, &self.poly)
.enumerate() // see below
.par_bridge()
.map(|(num, chunk)| {
let chunk = chunk?;
let id = hash(&chunk);
let size = chunk.len() as u64;
if !index.has_data(&id) {
data_packer.add(&chunk, &id)?
if !self.index.has_data(&id) {
self.data_packer.add(&chunk, &id)?
}
p.inc(size);
Ok((id, size))
Ok((num, id, size))
})
.try_for_each(|data: Result<_>| -> Result<_> {
let (id, size) = data?;
content.push(id);
filesize += size;
Ok(())
})?;
.collect::<Result<_>>()?;
// As par_bridge doesn't guarantee to keep the order, we sort by the enumeration
chunks.par_sort_unstable_by_key(|x| x.0);
let filesize = chunks.iter().map(|x| x.2).sum();
let content = chunks.into_iter().map(|x| x.1).collect();
let mut node = node;
node.set_content(content);

View File

@ -16,7 +16,7 @@ fn default_predicate(x: u64) -> bool {
(x & SPLITMASK) == 0
}
pub struct ChunkIter<R: Read> {
pub struct ChunkIter<R: Read + Send> {
buf: Vec<u8>,
pos: usize,
reader: R,
@ -28,7 +28,7 @@ pub struct ChunkIter<R: Read> {
finished: bool,
}
impl<R: Read> ChunkIter<R> {
impl<R: Read + Send> ChunkIter<R> {
pub fn new(reader: R, size_hint: usize, poly: &Polynom64) -> Self {
Self {
buf: Vec::with_capacity(4 * KB),
@ -44,7 +44,7 @@ impl<R: Read> ChunkIter<R> {
}
}
impl<R: Read> Iterator for ChunkIter<R> {
impl<R: Read + Send> Iterator for ChunkIter<R> {
type Item = io::Result<Vec<u8>>;
fn next(&mut self) -> Option<io::Result<Vec<u8>>> {