From fac991d6ac0dcba776e68d1adb137d221e27f8b7 Mon Sep 17 00:00:00 2001 From: Alexander Weiss Date: Mon, 19 Dec 2022 00:49:33 +0100 Subject: [PATCH] backup: Use rayon to parallelize hashing --- Cargo.lock | 36 --------------------------------- Cargo.toml | 1 - src/archiver/archiver_impl.rs | 38 +++++++++++++++-------------------- src/chunker.rs | 6 +++--- 4 files changed, 19 insertions(+), 62 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f7e8b85..7b0ff90 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -362,20 +362,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "crossbeam" -version = "0.8.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2801af0d36612ae591caa9568261fddce32ce6e08a7275ea334a06a4ad021a2c" -dependencies = [ - "cfg-if", - "crossbeam-channel", - "crossbeam-deque", - "crossbeam-epoch", - "crossbeam-queue", - "crossbeam-utils", -] - [[package]] name = "crossbeam-channel" version = "0.5.6" @@ -410,16 +396,6 @@ dependencies = [ "scopeguard", ] -[[package]] -name = "crossbeam-queue" -version = "0.3.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d1cfb3ea8a53f37c40dea2c7bedcbd88bdfae54f5e2175d6ecaff1c988353add" -dependencies = [ - "cfg-if", - "crossbeam-utils", -] - [[package]] name = "crossbeam-utils" version = "0.8.14" @@ -1330,17 +1306,6 @@ version = "3.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c1b04fb49957986fdce4d6ee7a65027d55d4b6d2265e5848bbb507b58ccfdb6f" -[[package]] -name = "pariter" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "324a62b9e7b5f270c0acc92a2040f8028bb643f959f9c068f11a7864f327e3d9" -dependencies = [ - "crossbeam", - "crossbeam-channel", - "num_cpus", -] - [[package]] name = "parking_lot" version = "0.12.1" @@ -1737,7 +1702,6 @@ dependencies = [ "log", "merge", "nix", - "pariter", "path-dedot", "quickcheck", "quickcheck_macros", diff --git a/Cargo.toml b/Cargo.toml index 283e129..6647f05 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,7 +33,6 @@ log = "0.4" # parallelize crossbeam-channel = "0.5" rayon = "1" -pariter = "0.5" #crypto aes256ctr_poly1305aes = "0.1" sha2 = "0.10" diff --git a/src/archiver/archiver_impl.rs b/src/archiver/archiver_impl.rs index 70cacc5..cbf3dd1 100644 --- a/src/archiver/archiver_impl.rs +++ b/src/archiver/archiver_impl.rs @@ -7,7 +7,7 @@ use bytesize::ByteSize; use chrono::Local; use indicatif::ProgressBar; use log::*; -use pariter::IteratorExt; +use rayon::prelude::*; use crate::backend::DecryptWriteBackend; use crate::blob::{BlobType, Metadata, Node, NodeType, Packer, Tree}; @@ -227,37 +227,31 @@ impl Archiver { pub fn backup_reader( &mut self, - r: impl Read + 'static, + r: impl Read + Send + 'static, node: Node, p: ProgressBar, ) -> Result<()> { - let chunk_iter = ChunkIter::new(r, *node.meta().size() as usize, &self.poly); - let mut content = Vec::new(); - let mut filesize: u64 = 0; - let index = self.index.clone(); - let data_packer = self.data_packer.clone(); - - chunk_iter - .into_iter() - // TODO: This parallelization works pretty well for big files. For small files this produces a lot of - // unneccessary overhead. Maybe use a parallel hashing actor? - .parallel_map(move |chunk| { + let mut chunks: Vec<_> = ChunkIter::new(r, *node.meta().size() as usize, &self.poly) + .enumerate() // see below + .par_bridge() + .map(|(num, chunk)| { let chunk = chunk?; let id = hash(&chunk); let size = chunk.len() as u64; - if !index.has_data(&id) { - data_packer.add(&chunk, &id)? + if !self.index.has_data(&id) { + self.data_packer.add(&chunk, &id)? } p.inc(size); - Ok((id, size)) + Ok((num, id, size)) }) - .try_for_each(|data: Result<_>| -> Result<_> { - let (id, size) = data?; - content.push(id); - filesize += size; - Ok(()) - })?; + .collect::>()?; + + // As par_bridge doesn't guarantee to keep the order, we sort by the enumeration + chunks.par_sort_unstable_by_key(|x| x.0); + + let filesize = chunks.iter().map(|x| x.2).sum(); + let content = chunks.into_iter().map(|x| x.1).collect(); let mut node = node; node.set_content(content); diff --git a/src/chunker.rs b/src/chunker.rs index 0804e8b..3716b92 100644 --- a/src/chunker.rs +++ b/src/chunker.rs @@ -16,7 +16,7 @@ fn default_predicate(x: u64) -> bool { (x & SPLITMASK) == 0 } -pub struct ChunkIter { +pub struct ChunkIter { buf: Vec, pos: usize, reader: R, @@ -28,7 +28,7 @@ pub struct ChunkIter { finished: bool, } -impl ChunkIter { +impl ChunkIter { pub fn new(reader: R, size_hint: usize, poly: &Polynom64) -> Self { Self { buf: Vec::with_capacity(4 * KB), @@ -44,7 +44,7 @@ impl ChunkIter { } } -impl Iterator for ChunkIter { +impl Iterator for ChunkIter { type Item = io::Result>; fn next(&mut self) -> Option>> {