diff --git a/src/archiver/archiver_impl.rs b/src/archiver/archiver_impl.rs index 7e6130e..b053e9d 100644 --- a/src/archiver/archiver_impl.rs +++ b/src/archiver/archiver_impl.rs @@ -188,7 +188,7 @@ impl Archiver { } if !self.index.has_tree(&id) { - match self.tree_packer.add(&chunk, &id).await? { + match self.tree_packer.add(&chunk, &id)? { 0 => {} packed_size => { self.summary.tree_blobs += 1; @@ -242,13 +242,13 @@ impl Archiver { if queue.len() > 8 { let (id, chunk, size) = queue.next().await.unwrap()?; - self.process_data_junk(id, &chunk, size, &p).await?; + self.process_data_junk(id, &chunk, size, &p)?; content.push(id); } } while let Some(Ok((id, chunk, size))) = queue.next().await { - self.process_data_junk(id, &chunk, size, &p).await?; + self.process_data_junk(id, &chunk, size, &p)?; content.push(id); } @@ -258,7 +258,7 @@ impl Archiver { Ok(()) } - async fn process_data_junk( + fn process_data_junk( &mut self, id: Id, chunk: &[u8], @@ -266,7 +266,7 @@ impl Archiver { p: &ProgressBar, ) -> Result<()> { if !self.index.has_data(&id) { - match self.data_packer.add(chunk, &id).await? { + match self.data_packer.add(chunk, &id)? { 0 => {} packed_size => { self.summary.data_blobs += 1; @@ -286,14 +286,14 @@ impl Archiver { let (chunk, id) = self.tree.serialize()?; if !self.index.has_tree(&id) { - self.tree_packer.add(&chunk, &id).await?; + self.tree_packer.add(&chunk, &id)?; } self.snap.tree = id; - self.data_packer.finalize().await?; - self.tree_packer.finalize().await?; + self.data_packer.finalize()?; + self.tree_packer.finalize()?; { - let indexer = self.indexer.write().await; + let indexer = self.indexer.write().unwrap(); indexer.finalize()?; } let end_time = Local::now(); diff --git a/src/blob/packer.rs b/src/blob/packer.rs index 568f3e7..a52c503 100644 --- a/src/blob/packer.rs +++ b/src/blob/packer.rs @@ -5,7 +5,7 @@ use std::time::{Duration, SystemTime}; use anyhow::{anyhow, Result}; use bytes::{Bytes, BytesMut}; use chrono::Local; -use tokio::{spawn, task::JoinHandle}; +use crossbeam_channel::{bounded, Receiver, Sender}; use zstd::encode_all; use super::BlobType; @@ -74,7 +74,7 @@ pub struct Packer { index: IndexPack, indexer: SharedIndexer, hasher: Hasher, - file_writer: FileWriter, + file_writer: Actor<(Bytes, Id, IndexPack)>, zstd: Option, pack_sizer: PackSizer, } @@ -87,12 +87,15 @@ impl Packer { config: &ConfigFile, total_size: u64, ) -> Result { - let file_writer = FileWriter { - future: None, - be: be.clone(), - indexer: indexer.clone(), - cacheable: blob_type.is_cacheable(), - }; + let file_writer = Actor::new( + FileWriterHandle { + be: be.clone(), + indexer: indexer.clone(), + cacheable: blob_type.is_cacheable(), + }, + 1, + 1, + ); let zstd = config.zstd()?; let pack_sizer = PackSizer::from_config(config, blob_type, total_size); Ok(Self { @@ -111,12 +114,13 @@ impl Packer { }) } - pub async fn finalize(&mut self) -> Result<()> { - self.save().await?; - self.file_writer.finalize().await + pub fn finalize(mut self) -> Result<()> { + self.save()?; + self.file_writer.finalize()?; + Ok(()) } - pub async fn write_data(&mut self, data: &[u8]) -> Result { + pub fn write_data(&mut self, data: &[u8]) -> Result { self.hasher.update(data); let len = data.len().try_into()?; self.file.extend_from_slice(data); @@ -125,25 +129,20 @@ impl Packer { } // adds the blob to the packfile; returns the actually added size - pub async fn add(&mut self, data: &[u8], id: &Id) -> Result { + pub fn add(&mut self, data: &[u8], id: &Id) -> Result { // compute size limit based on total size and size bounds let size_limit = self.pack_sizer.pack_size(); - self.add_with_sizelimit(data, id, size_limit).await + self.add_with_sizelimit(data, id, size_limit) } // adds the blob to the packfile; returns the actually added size - pub async fn add_with_sizelimit( - &mut self, - data: &[u8], - id: &Id, - size_limit: u32, - ) -> Result { + pub fn add_with_sizelimit(&mut self, data: &[u8], id: &Id, size_limit: u32) -> Result { // only add if this blob is not present if self.has(id) { return Ok(0); } { - let indexer = self.indexer.read().await; + let indexer = self.indexer.read().unwrap(); if indexer.has(id) { return Ok(0); } @@ -167,13 +166,12 @@ impl Packer { }; // add using current total_size as repo_size - self.add_raw(&data, id, uncompressed_length, size_limit) - .await?; + self.add_raw(&data, id, uncompressed_length, size_limit)?; Ok(data.len().try_into()?) } // adds the already compressed/encrypted blob to the packfile without any check - pub async fn add_raw( + pub fn add_raw( &mut self, data: &[u8], id: &Id, @@ -181,7 +179,7 @@ impl Packer { size_limit: u32, ) -> Result<()> { let offset = self.size; - let len = self.write_data(data).await?; + let len = self.write_data(data)?; self.index .add(*id, self.blob_type, offset, len, uncompressed_length); self.count += 1; @@ -190,7 +188,7 @@ impl Packer { if self.count >= MAX_COUNT || self.size >= size_limit || self.created.elapsed()? >= MAX_AGE { self.pack_sizer.add_size(self.index.pack_size()); - self.save().await?; + self.save()?; self.size = 0; self.count = 0; self.created = SystemTime::now(); @@ -200,7 +198,7 @@ impl Packer { } /// writes header and length of header to packfile - pub async fn write_header(&mut self) -> Result<()> { + pub fn write_header(&mut self) -> Result<()> { // comput the pack header let data = PackHeaderRef::from_index_pack(&self.index).to_binary()?; @@ -212,21 +210,20 @@ impl Packer { .map_err(|_| anyhow!("crypto error"))?; let headerlen = data.len().try_into()?; - self.write_data(&data).await?; + self.write_data(&data)?; // finally write length of header unencrypted to pack file - self.write_data(&PackHeaderLength::from_u32(headerlen).to_binary()?) - .await?; + self.write_data(&PackHeaderLength::from_u32(headerlen).to_binary()?)?; Ok(()) } - pub async fn save(&mut self) -> Result<()> { + pub fn save(&mut self) -> Result<()> { if self.size == 0 { return Ok(()); } - self.write_header().await?; + self.write_header()?; // compute id of packfile let id = self.hasher.finalize(); @@ -235,7 +232,7 @@ impl Packer { // write file to backend let index = std::mem::take(&mut self.index); let file = std::mem::replace(&mut self.file, BytesMut::new()); - self.file_writer.add(index, file.into(), id).await?; + self.file_writer.send((file.into(), id, index))?; Ok(()) } @@ -245,36 +242,69 @@ impl Packer { } } -struct FileWriter { - future: Option>>, +#[derive(Clone)] +struct FileWriterHandle { be: BE, indexer: SharedIndexer, cacheable: bool, } -impl FileWriter { - async fn add(&mut self, mut index: IndexPack, file: Bytes, id: Id) -> Result<()> { - let be = self.be.clone(); - let indexer = self.indexer.clone(); - let cacheable = self.cacheable; - let new_future = spawn(async move { - be.write_bytes(FileType::Pack, &id, cacheable, file)?; - index.time = Some(Local::now()); - indexer.write().await.add(index)?; - Ok(()) +impl ActorHandle<(Bytes, Id, IndexPack)> for FileWriterHandle { + fn process(&self, load: (Bytes, Id, IndexPack)) -> Result<()> { + let (file, id, mut index) = load; + self.be + .write_bytes(FileType::Pack, &id, self.cacheable, file)?; + index.time = Some(Local::now()); + self.indexer.write().unwrap().add(index)?; + Ok(()) + } +} + +pub trait ActorHandle: Clone + Send + 'static { + fn process(&self, load: T) -> Result<()>; +} + +pub struct Actor { + sender: Sender, + finish: Receiver>, +} + +impl Actor { + pub fn new(fwh: impl ActorHandle, queue_len: usize, par: usize) -> Self { + let (tx, rx) = bounded(queue_len); + let (finish_tx, finish_rx) = bounded::>(0); + (0..par).for_each(|_| { + let rx = rx.clone(); + let finish_tx = finish_tx.clone(); + let fwh = fwh.clone(); + std::thread::spawn(move || { + let mut status = Ok(()); + for load in rx { + // only keep processing if there was no error + if status.is_ok() { + status = fwh.process(load); + } + } + let _ = finish_tx.send(status); + }); }); - if let Some(fut) = self.future.replace(new_future) { - fut.await??; + Self { + sender: tx, + finish: finish_rx, } + } + + pub fn send(&self, load: T) -> Result<()> { + self.sender.send(load)?; Ok(()) } - async fn finalize(&mut self) -> Result<()> { - if let Some(fut) = self.future.take() { - return fut.await?; - } - Ok(()) + pub fn finalize(self) -> Result<()> { + // cancel channel + drop(self.sender); + // wait for items in channel to be processed + self.finish.recv().unwrap() } } @@ -301,7 +331,7 @@ impl Repacker { }) } - pub async fn add_fast(&mut self, pack_id: &Id, blob: &IndexBlob) -> Result<()> { + pub fn add_fast(&mut self, pack_id: &Id, blob: &IndexBlob) -> Result<()> { let data = self.be.read_partial( FileType::Pack, pack_id, @@ -310,12 +340,11 @@ impl Repacker { blob.length, )?; self.packer - .add_raw(&data, &blob.id, blob.uncompressed_length, self.size_limit) - .await?; + .add_raw(&data, &blob.id, blob.uncompressed_length, self.size_limit)?; Ok(()) } - pub async fn add(&mut self, pack_id: &Id, blob: &IndexBlob) -> Result<()> { + pub fn add(&mut self, pack_id: &Id, blob: &IndexBlob) -> Result<()> { let data = self.be.read_encrypted_partial( FileType::Pack, pack_id, @@ -325,12 +354,11 @@ impl Repacker { blob.uncompressed_length, )?; self.packer - .add_with_sizelimit(&data, &blob.id, self.size_limit) - .await?; + .add_with_sizelimit(&data, &blob.id, self.size_limit)?; Ok(()) } - pub async fn finalize(&mut self) -> Result<()> { - self.packer.finalize().await + pub fn finalize(self) -> Result<()> { + self.packer.finalize() } } diff --git a/src/commands/prune.rs b/src/commands/prune.rs index 804b304..407827c 100644 --- a/src/commands/prune.rs +++ b/src/commands/prune.rs @@ -173,7 +173,7 @@ pub(super) async fn execute( wait(opts.warm_up_wait).await; if !opts.dry_run { - pruner.do_prune(be, opts, config).await?; + pruner.do_prune(be, opts, config)?; } Ok(()) } @@ -828,7 +828,7 @@ impl Pruner { .collect() } - async fn do_prune( + fn do_prune( mut self, be: &impl DecryptFullBackend, opts: Opts, @@ -888,7 +888,7 @@ impl Pruner { time: Some(Local::now()), blobs: Vec::new(), }; - indexer.write().await.add_remove(pack)?; + indexer.write().unwrap().add_remove(pack)?; } } } @@ -925,7 +925,7 @@ impl Pruner { PackToDo::Keep => { // keep pack: add to new index let pack = pack.into_index_pack(); - indexer.write().await.add(pack)?; + indexer.write().unwrap().add(pack)?; } PackToDo::Repack => { // TODO: repack in parallel @@ -940,9 +940,9 @@ impl Pruner { BlobType::Tree => &mut tree_repacker, }; if opts.fast_repack { - repacker.add_fast(&pack.id, blob).await?; + repacker.add_fast(&pack.id, blob)?; } else { - repacker.add(&pack.id, blob).await?; + repacker.add(&pack.id, blob)?; } p.inc(blob.length as u64); } @@ -951,7 +951,7 @@ impl Pruner { } else { // mark pack for removal let pack = pack.into_index_pack_with_time(self.time); - indexer.write().await.add_remove(pack)?; + indexer.write().unwrap().add_remove(pack)?; } } PackToDo::MarkDelete => { @@ -960,7 +960,7 @@ impl Pruner { } else { // mark pack for removal let pack = pack.into_index_pack_with_time(self.time); - indexer.write().await.add_remove(pack)?; + indexer.write().unwrap().add_remove(pack)?; } } PackToDo::KeepMarked => { @@ -969,22 +969,22 @@ impl Pruner { } else { // keep pack: add to new index let pack = pack.into_index_pack(); - indexer.write().await.add_remove(pack)?; + indexer.write().unwrap().add_remove(pack)?; } } PackToDo::Recover => { // recover pack: add to new index in section packs let pack = pack.into_index_pack_with_time(self.time); - indexer.write().await.add(pack)?; + indexer.write().unwrap().add(pack)?; } PackToDo::Delete => delete_pack(pack), } } indexes_remove.push(index.id); } - tree_repacker.finalize().await?; - data_repacker.finalize().await?; - indexer.write().await.finalize()?; + tree_repacker.finalize()?; + data_repacker.finalize()?; + indexer.write().unwrap().finalize()?; p.finish(); if !data_packs_remove.is_empty() { diff --git a/src/commands/repair.rs b/src/commands/repair.rs index d36bdcc..586d43d 100644 --- a/src/commands/repair.rs +++ b/src/commands/repair.rs @@ -1,7 +1,6 @@ use std::collections::{HashMap, HashSet}; use anyhow::Result; -use async_recursion::async_recursion; use clap::{AppSettings, Parser, Subcommand}; use log::*; @@ -89,7 +88,7 @@ pub(super) async fn execute( ) -> Result<()> { match opts.command { Command::Index(opt) => repair_index(be, opt).await, - Command::Snapshots(opt) => repair_snaps(be, opt, config_file, config).await, + Command::Snapshots(opt) => repair_snaps(be, opt, config_file, config), } } @@ -191,17 +190,17 @@ async fn repair_index(be: &impl DecryptFullBackend, opts: IndexOpts) -> Result<( pack.set_id(id); pack.blobs = PackHeader::from_file(be, id, size_hint, packsize)?.into_blobs(); if !opts.dry_run { - indexer.write().await.add_with(pack, to_delete)?; + indexer.write().unwrap().add_with(pack, to_delete)?; } p.inc(1); } - indexer.write().await.finalize()?; + indexer.write().unwrap().finalize()?; p.finish(); Ok(()) } -async fn repair_snaps( +fn repair_snaps( be: &impl DecryptFullBackend, mut opts: SnapOpts, config_file: RusticConfig, @@ -238,9 +237,7 @@ async fn repair_snaps( &mut replaced, &mut seen, &opts, - ) - .await? - { + )? { (Changed::None, _) => { info!("snapshot {snap_id} is ok."); } @@ -267,8 +264,8 @@ async fn repair_snaps( } if !opts.dry_run { - packer.finalize().await?; - indexer.write().await.finalize()?; + packer.finalize()?; + indexer.write().unwrap().finalize()?; } if opts.delete { @@ -294,8 +291,7 @@ enum Changed { None, } -#[async_recursion] -async fn repair_tree( +fn repair_tree( be: &impl IndexedBackend, packer: &mut Packer, id: Option, @@ -352,7 +348,7 @@ async fn repair_tree( } NodeType::Dir {} => { let (c, tree_id) = - repair_tree(be, packer, node.subtree, replaced, seen, opts).await?; + repair_tree(be, packer, node.subtree, replaced, seen, opts)?; match c { Changed::None => {} Changed::This => { @@ -385,7 +381,7 @@ async fn repair_tree( // the tree has been changed => save it let (chunk, new_id) = tree.serialize()?; if !be.has_tree(&new_id) && !opts.dry_run { - packer.add(&chunk, &new_id).await?; + packer.add(&chunk, &new_id)?; } if let Some(id) = id { replaced.insert(id, (c, new_id)); diff --git a/src/index/indexer.rs b/src/index/indexer.rs index 4aba144..ca7d48b 100644 --- a/src/index/indexer.rs +++ b/src/index/indexer.rs @@ -1,9 +1,9 @@ use std::collections::HashSet; use std::sync::Arc; +use std::sync::RwLock; use std::time::{Duration, SystemTime}; use anyhow::Result; -use tokio::sync::RwLock; use crate::backend::DecryptWriteBackend; use crate::id::Id;