diff --git a/Cargo.lock b/Cargo.lock index fbae22a..130d947 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -448,6 +448,20 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2801af0d36612ae591caa9568261fddce32ce6e08a7275ea334a06a4ad021a2c" +dependencies = [ + "cfg-if", + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-epoch", + "crossbeam-queue", + "crossbeam-utils", +] + [[package]] name = "crossbeam-channel" version = "0.5.8" @@ -482,6 +496,16 @@ dependencies = [ "scopeguard", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1cfb3ea8a53f37c40dea2c7bedcbd88bdfae54f5e2175d6ecaff1c988353add" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.15" @@ -1479,6 +1503,17 @@ version = "3.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c1b04fb49957986fdce4d6ee7a65027d55d4b6d2265e5848bbb507b58ccfdb6f" +[[package]] +name = "pariter" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "324a62b9e7b5f270c0acc92a2040f8028bb643f959f9c068f11a7864f327e3d9" +dependencies = [ + "crossbeam", + "crossbeam-channel", + "num_cpus", +] + [[package]] name = "parking_lot" version = "0.12.1" @@ -1923,6 +1958,7 @@ dependencies = [ "merge", "nix", "nom", + "pariter", "path-dedot", "quickcheck", "quickcheck_macros", diff --git a/Cargo.toml b/Cargo.toml index c7bfe11..9f899de 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,7 @@ log = "0.4" # parallelize crossbeam-channel = "0.5" rayon = "1" +pariter = "0.5" #crypto aes256ctr_poly1305aes = "0.1" rand = "0.8" diff --git a/changelog/new.txt b/changelog/new.txt index d389443..54633a4 100644 --- a/changelog/new.txt +++ b/changelog/new.txt @@ -6,6 +6,7 @@ Bugs fixed: - restore: Warm-up options given by the command line didn't work. This has been fixed. New features: +- backup: Backing up (small) files is now much more parallelized. - prune: Added option --repack-all - Option --dry-run is now a global option and can also be defined in the config file or via env variable - Updated to clap v4 diff --git a/src/archiver/file_archiver.rs b/src/archiver/file_archiver.rs index 95fc90c..f8f6ac4 100644 --- a/src/archiver/file_archiver.rs +++ b/src/archiver/file_archiver.rs @@ -2,7 +2,6 @@ use std::io::Read; use anyhow::{anyhow, Result}; use indicatif::ProgressBar; -use rayon::prelude::*; use crate::backend::{DecryptWriteBackend, ReadSourceOpen}; use crate::blob::{BlobType, Node, NodeType, Packer, PackerStats}; @@ -69,28 +68,22 @@ impl FileArchiver { node: Node, p: ProgressBar, ) -> Result<(Node, u64)> { - let mut chunks: Vec<_> = - ChunkIter::new(r, *node.meta().size() as usize, self.rabin.clone()) - .enumerate() // see below - .par_bridge() - .map(|(num, chunk)| { - let chunk = chunk?; - let id = hash(&chunk); - let size = chunk.len() as u64; + let chunks: Vec<_> = ChunkIter::new(r, *node.meta().size() as usize, self.rabin.clone()) + .map(|chunk| { + let chunk = chunk?; + let id = hash(&chunk); + let size = chunk.len() as u64; - if !self.index.has_data(&id) { - self.data_packer.add(&chunk, &id)?; - } - p.inc(size); - Ok((num, id, size)) - }) - .collect::>()?; + if !self.index.has_data(&id) { + self.data_packer.add(chunk.into(), id)?; + } + p.inc(size); + Ok((id, size)) + }) + .collect::>()?; - // As par_bridge doesn't guarantee to keep the order, we sort by the enumeration - chunks.sort_unstable_by_key(|x| x.0); - - let filesize = chunks.iter().map(|x| x.2).sum(); - let content = chunks.into_iter().map(|x| x.1).collect(); + let filesize = chunks.iter().map(|x| x.1).sum(); + let content = chunks.into_iter().map(|x| x.0).collect(); let mut node = node; node.set_content(content); diff --git a/src/archiver/mod.rs b/src/archiver/mod.rs index 2910817..a87ba61 100644 --- a/src/archiver/mod.rs +++ b/src/archiver/mod.rs @@ -14,6 +14,7 @@ use anyhow::Result; use chrono::Local; use indicatif::ProgressBar; use log::*; +use pariter::{scope, IteratorExt}; use crate::backend::{DecryptWriteBackend, ReadSource, ReadSourceEntry}; use crate::blob::BlobType; @@ -59,13 +60,18 @@ impl Archiver { }) } - pub fn archive( + pub fn archive( mut self, - src: impl ReadSource, + src: R, backup_path: &Path, as_path: Option<&PathBuf>, p: &ProgressBar, - ) -> Result { + ) -> Result + where + R: ReadSource + 'static, + ::Open: Send, + ::Iter: Send, + { if !p.is_hidden() { if let Some(size) = src.size()? { p.set_length(size); @@ -104,28 +110,28 @@ impl Archiver { // handle beginning and ending of trees let iter = TreeIterator::new(iter); - // use parent snapshot - let iter = iter.filter_map(|item| match self.parent.process(item) { - Ok(item) => Some(item), - Err(err) => { - warn!("ignoring error reading parent snapshot: {err:?}"); - None - } - }); - - // archive files - let iter = iter.filter_map(|item| match self.file_archiver.process(item, p.clone()) { - Ok(item) => Some(item), - Err(err) => { - warn!("ignoring error: {err:?}"); - None - } - }); - - // save items in trees - for item in iter { - self.tree_archiver.add(item)?; - } + scope(|scope| -> Result<_> { + // use parent snapshot + iter.filter_map(|item| match self.parent.process(item) { + Ok(item) => Some(item), + Err(err) => { + warn!("ignoring error reading parent snapshot: {err:?}"); + None + } + }) + // archive files in parallel + .parallel_map_scoped(scope, |item| self.file_archiver.process(item, p.clone())) + .readahead_scoped(scope) + .filter_map(|item| match item { + Ok(item) => Some(item), + Err(err) => { + warn!("ignoring error: {err:?}"); + None + } + }) + .try_for_each(|item| self.tree_archiver.add(item)) + }) + .unwrap()?; let stats = self.file_archiver.finalize()?; let (id, mut summary) = self.tree_archiver.finalize(self.parent_tree)?; diff --git a/src/archiver/tree_archiver.rs b/src/archiver/tree_archiver.rs index 83786cc..a21b584 100644 --- a/src/archiver/tree_archiver.rs +++ b/src/archiver/tree_archiver.rs @@ -122,7 +122,7 @@ impl TreeArchiver { } if !self.index.has_tree(&id) { - self.tree_packer.add(&chunk, &id)?; + self.tree_packer.add(chunk.into(), id)?; } Ok(id) } diff --git a/src/blob/packer.rs b/src/blob/packer.rs index f9a6648..e3494ce 100644 --- a/src/blob/packer.rs +++ b/src/blob/packer.rs @@ -7,11 +7,12 @@ use anyhow::{anyhow, Result}; use bytes::{Bytes, BytesMut}; use chrono::Local; use crossbeam_channel::{bounded, Receiver, Sender}; +use pariter::{scope, IteratorExt}; use zstd::encode_all; use super::BlobType; use crate::backend::{DecryptFullBackend, DecryptWriteBackend, FileType}; -use crate::crypto::{CryptoKey, Hasher}; +use crate::crypto::{hash, CryptoKey}; use crate::id::Id; use crate::index::SharedIndexer; use crate::repofile::{ @@ -72,10 +73,12 @@ impl PackSizer { #[derive(Clone)] pub struct Packer { + // This is a hack: raw_packer and indexer are only used in the add_raw() method. + // TODO: Refactor as actor, like the other add() methods raw_packer: Arc>>, - key: BE::Key, - zstd: Option, indexer: SharedIndexer, + sender: Sender<(Bytes, Id, Option)>, + finish: Receiver>, } impl Packer { @@ -96,56 +99,76 @@ impl Packer { )?)); let zstd = config.zstd()?; - Ok(Self { - raw_packer, - key, - zstd, - indexer, - }) + let (tx, rx) = bounded(0); + let (finish_tx, finish_rx) = bounded::>(0); + let packer = Self { + raw_packer: raw_packer.clone(), + indexer: indexer.clone(), + sender: tx, + finish: finish_rx, + }; + + std::thread::spawn(move || { + scope(|scope| { + let status = rx + .into_iter() + .readahead_scoped(scope) + .filter(|(_, id, _)| !indexer.read().unwrap().has(id)) + .filter(|(_, id, _)| !raw_packer.read().unwrap().has(id)) + .readahead_scoped(scope) + .parallel_map_scoped( + scope, + |(data, id, size_limit): (Bytes, Id, Option)| { + let data_len: u32 = data.len().try_into()?; + let (data, uncompressed_length) = match zstd { + None => ( + key.encrypt_data(&data) + .map_err(|_| anyhow!("crypto error"))?, + None, + ), + // compress if requested + Some(level) => ( + key.encrypt_data(&encode_all(&*data, level)?) + .map_err(|_| anyhow!("crypto error"))?, + NonZeroU32::new(data_len), + ), + }; + Ok(( + data, + id, + u64::from(data_len), + uncompressed_length, + size_limit, + )) + }, + ) + .readahead_scoped(scope) + .try_for_each(|item: Result<_>| { + let (data, id, data_len, ul, size_limit) = item?; + raw_packer + .write() + .unwrap() + .add_raw(&data, &id, data_len, ul, size_limit) + }) + .and_then(|_| raw_packer.write().unwrap().finalize()); + let _ = finish_tx.send(status); + }) + .unwrap(); + }); + + Ok(packer) } /// adds the blob to the packfile - pub fn add(&self, data: &[u8], id: &Id) -> Result<()> { + pub fn add(&self, data: Bytes, id: Id) -> Result<()> { // compute size limit based on total size and size bounds self.add_with_sizelimit(data, id, None) } /// adds the blob to the packfile, allows specifying a size limit for the pack file - pub fn add_with_sizelimit(&self, data: &[u8], id: &Id, size_limit: Option) -> Result<()> { - // only add if this blob is not present - if self.indexer.read().unwrap().has(id) { - // Note: This is within two if clauses , because here the indexer lock is already released. - // using "if self.indexer.read().unwrap().has(id) || self.raw_packer.read().unwrap().has(id)" - // can lead to a deadlock as the indexer lock is hold too long (and also needed within raw_packer!) - if self.raw_packer.read().unwrap().has(id) { - return Ok(()); - } - } - - let key = self.key.clone(); - let zstd = self.zstd; - - let data_len: u32 = data.len().try_into()?; - let (data, uncompressed_length) = match zstd { - None => ( - key.encrypt_data(data) - .map_err(|_| anyhow!("crypto error"))?, - None, - ), - // compress if requested - Some(level) => ( - key.encrypt_data(&encode_all(data, level)?) - .map_err(|_| anyhow!("crypto error"))?, - NonZeroU32::new(data_len), - ), - }; - self.add_raw( - &data, - id, - u64::from(data_len), - uncompressed_length, - size_limit, - ) + pub fn add_with_sizelimit(&self, data: Bytes, id: Id, size_limit: Option) -> Result<()> { + self.sender.send((data, id, size_limit))?; + Ok(()) } /// adds the already encrypted (and maybe compressed) blob to the packfile @@ -172,7 +195,10 @@ impl Packer { } pub fn finalize(self) -> Result { - self.raw_packer.write().unwrap().finalize() + // cancel channel + drop(self.sender); + // wait for items in channel to be processed + self.finish.recv().unwrap() } } @@ -210,8 +236,7 @@ pub struct RawPacker { count: u32, created: SystemTime, index: IndexPack, - hasher: Hasher, - file_writer: Option>, + file_writer: Option, pack_sizer: PackSizer, stats: PackerStats, } @@ -242,7 +267,6 @@ impl RawPacker { count: 0, created: SystemTime::now(), index: IndexPack::default(), - hasher: Hasher::new(), file_writer, pack_sizer, stats: PackerStats::default(), @@ -256,7 +280,6 @@ impl RawPacker { } pub fn write_data(&mut self, data: &[u8]) -> Result { - self.hasher.update(data); let len = data.len().try_into()?; self.file.extend_from_slice(data); self.size += len; @@ -272,10 +295,6 @@ impl RawPacker { uncompressed_length: Option, size_limit: Option, ) -> Result<()> { - if self.has(id) { - return Ok(()); - } - self.stats.blobs += 1; self.stats.data += data_len; let data_len_packed: u64 = data.len().try_into()?; @@ -296,7 +315,6 @@ impl RawPacker { self.size = 0; self.count = 0; self.created = SystemTime::now(); - self.hasher.reset(); } Ok(()) } @@ -329,17 +347,13 @@ impl RawPacker { self.write_header()?; - // compute id of packfile - let id = self.hasher.finalize(); - self.index.set_id(id); - // write file to backend let index = std::mem::take(&mut self.index); let file = std::mem::replace(&mut self.file, BytesMut::new()); self.file_writer .as_ref() .unwrap() - .send((file.into(), id, index))?; + .send((file.into(), index))?; Ok(()) } @@ -356,44 +370,52 @@ struct FileWriterHandle { cacheable: bool, } -impl ActorHandle<(Bytes, Id, IndexPack)> for FileWriterHandle { - fn process(&self, load: (Bytes, Id, IndexPack)) -> Result<()> { +impl FileWriterHandle { + fn process(&self, load: (Bytes, Id, IndexPack)) -> Result { let (file, id, mut index) = load; + index.set_id(id); self.be .write_bytes(FileType::Pack, &id, self.cacheable, file)?; index.time = Some(Local::now()); + Ok(index) + } + + fn index(&self, index: IndexPack) -> Result<()> { self.indexer.write().unwrap().add(index)?; Ok(()) } } -pub trait ActorHandle: Clone + Send + 'static { - fn process(&self, load: T) -> Result<()>; -} - -pub struct Actor { - sender: Sender, +pub struct Actor { + sender: Sender<(Bytes, IndexPack)>, finish: Receiver>, } -impl Actor { - pub fn new(fwh: impl ActorHandle, queue_len: usize, par: usize) -> Self { +impl Actor { + fn new( + fwh: FileWriterHandle, + queue_len: usize, + _par: usize, + ) -> Self { let (tx, rx) = bounded(queue_len); let (finish_tx, finish_rx) = bounded::>(0); - (0..par).for_each(|_| { - let rx = rx.clone(); - let finish_tx = finish_tx.clone(); - let fwh = fwh.clone(); - std::thread::spawn(move || { - let mut status = Ok(()); - for load in rx { - // only keep processing if there was no error - if status.is_ok() { - status = fwh.process(load); - } - } + + std::thread::spawn(move || { + scope(|scope| { + let status = rx + .into_iter() + .readahead_scoped(scope) + .map(|(file, index): (Bytes, IndexPack)| { + let id = hash(&file); + (file, id, index) + }) + .readahead_scoped(scope) + .map(|load| fwh.process(load)) + .readahead_scoped(scope) + .try_for_each(|index| fwh.index(index?)); let _ = finish_tx.send(status); - }); + }) + .unwrap(); }); Self { @@ -402,7 +424,7 @@ impl Actor { } } - pub fn send(&self, load: T) -> Result<()> { + pub fn send(&self, load: (Bytes, IndexPack)) -> Result<()> { self.sender.send(load)?; Ok(()) } @@ -466,7 +488,7 @@ impl Repacker { blob.uncompressed_length, )?; self.packer - .add_with_sizelimit(&data, &blob.id, Some(self.size_limit))?; + .add_with_sizelimit(data, blob.id, Some(self.size_limit))?; Ok(()) } diff --git a/src/commands/copy.rs b/src/commands/copy.rs index 3ffd86e..2a0feea 100644 --- a/src/commands/copy.rs +++ b/src/commands/copy.rs @@ -110,7 +110,7 @@ fn copy( trace!("copy tree blob {id}"); if !index_dest.has_tree(id) { let data = index.get_tree(id).unwrap().read_data(index.be())?; - tree_packer.add(&data, id)?; + tree_packer.add(data, *id)?; } Ok(()) })?; @@ -130,7 +130,7 @@ fn copy( trace!("copy data blob {id}"); if !index_dest.has_data(id) { let data = index.get_data(id).unwrap().read_data(index.be())?; - data_packer.add(&data, id)?; + data_packer.add(data, *id)?; } Ok(()) })?; @@ -141,7 +141,7 @@ fn copy( trace!("copy tree blob {id}"); if !index_dest.has_tree(&id) { let data = index.get_tree(&id).unwrap().read_data(index.be())?; - tree_packer.add(&data, &id)?; + tree_packer.add(data, id)?; } } diff --git a/src/commands/merge_cmd.rs b/src/commands/merge_cmd.rs index cea9feb..b327fba 100644 --- a/src/commands/merge_cmd.rs +++ b/src/commands/merge_cmd.rs @@ -82,7 +82,7 @@ pub(super) fn execute( let (chunk, new_id) = tree.serialize()?; let size = u64::try_from(chunk.len())?; if !index.has_tree(&new_id) { - packer.add(&chunk, &new_id)?; + packer.add(chunk.into(), new_id)?; } Ok((new_id, size)) }; diff --git a/src/commands/repair.rs b/src/commands/repair.rs index 832e852..7f780de 100644 --- a/src/commands/repair.rs +++ b/src/commands/repair.rs @@ -364,7 +364,7 @@ fn repair_tree( // the tree has been changed => save it let (chunk, new_id) = tree.serialize()?; if !be.has_tree(&new_id) && !gopts.dry_run { - packer.add(&chunk, &new_id)?; + packer.add(chunk.into(), new_id)?; } if let Some(id) = id { replaced.insert(id, (c, new_id)); diff --git a/src/crypto/hasher.rs b/src/crypto/hasher.rs index 9459338..4d03839 100644 --- a/src/crypto/hasher.rs +++ b/src/crypto/hasher.rs @@ -5,23 +5,3 @@ use crate::id::Id; pub fn hash(data: &[u8]) -> Id { Id::new(Sha256::digest(data).into()) } - -pub struct Hasher(Sha256); - -impl Hasher { - pub fn new() -> Self { - Self(Sha256::new()) - } - - pub fn reset(&mut self) { - self.0.reset(); - } - - pub fn update(&mut self, data: &[u8]) { - self.0.update(data); - } - - pub fn finalize(&mut self) -> Id { - Id::new(self.0.finalize_reset().into()) - } -}