Make packer sync

This commit is contained in:
Alexander Weiss 2022-10-20 00:04:23 +02:00
parent d8665fe794
commit fa0ac71aec
5 changed files with 121 additions and 97 deletions

View File

@ -188,7 +188,7 @@ impl<BE: DecryptWriteBackend, I: IndexedBackend> Archiver<BE, I> {
}
if !self.index.has_tree(&id) {
match self.tree_packer.add(&chunk, &id).await? {
match self.tree_packer.add(&chunk, &id)? {
0 => {}
packed_size => {
self.summary.tree_blobs += 1;
@ -242,13 +242,13 @@ impl<BE: DecryptWriteBackend, I: IndexedBackend> Archiver<BE, I> {
if queue.len() > 8 {
let (id, chunk, size) = queue.next().await.unwrap()?;
self.process_data_junk(id, &chunk, size, &p).await?;
self.process_data_junk(id, &chunk, size, &p)?;
content.push(id);
}
}
while let Some(Ok((id, chunk, size))) = queue.next().await {
self.process_data_junk(id, &chunk, size, &p).await?;
self.process_data_junk(id, &chunk, size, &p)?;
content.push(id);
}
@ -258,7 +258,7 @@ impl<BE: DecryptWriteBackend, I: IndexedBackend> Archiver<BE, I> {
Ok(())
}
async fn process_data_junk(
fn process_data_junk(
&mut self,
id: Id,
chunk: &[u8],
@ -266,7 +266,7 @@ impl<BE: DecryptWriteBackend, I: IndexedBackend> Archiver<BE, I> {
p: &ProgressBar,
) -> Result<()> {
if !self.index.has_data(&id) {
match self.data_packer.add(chunk, &id).await? {
match self.data_packer.add(chunk, &id)? {
0 => {}
packed_size => {
self.summary.data_blobs += 1;
@ -286,14 +286,14 @@ impl<BE: DecryptWriteBackend, I: IndexedBackend> Archiver<BE, I> {
let (chunk, id) = self.tree.serialize()?;
if !self.index.has_tree(&id) {
self.tree_packer.add(&chunk, &id).await?;
self.tree_packer.add(&chunk, &id)?;
}
self.snap.tree = id;
self.data_packer.finalize().await?;
self.tree_packer.finalize().await?;
self.data_packer.finalize()?;
self.tree_packer.finalize()?;
{
let indexer = self.indexer.write().await;
let indexer = self.indexer.write().unwrap();
indexer.finalize()?;
}
let end_time = Local::now();

View File

@ -5,7 +5,7 @@ use std::time::{Duration, SystemTime};
use anyhow::{anyhow, Result};
use bytes::{Bytes, BytesMut};
use chrono::Local;
use tokio::{spawn, task::JoinHandle};
use crossbeam_channel::{bounded, Receiver, Sender};
use zstd::encode_all;
use super::BlobType;
@ -74,7 +74,7 @@ pub struct Packer<BE: DecryptWriteBackend> {
index: IndexPack,
indexer: SharedIndexer<BE>,
hasher: Hasher,
file_writer: FileWriter<BE>,
file_writer: Actor<(Bytes, Id, IndexPack)>,
zstd: Option<i32>,
pack_sizer: PackSizer,
}
@ -87,12 +87,15 @@ impl<BE: DecryptWriteBackend> Packer<BE> {
config: &ConfigFile,
total_size: u64,
) -> Result<Self> {
let file_writer = FileWriter {
future: None,
be: be.clone(),
indexer: indexer.clone(),
cacheable: blob_type.is_cacheable(),
};
let file_writer = Actor::new(
FileWriterHandle {
be: be.clone(),
indexer: indexer.clone(),
cacheable: blob_type.is_cacheable(),
},
1,
1,
);
let zstd = config.zstd()?;
let pack_sizer = PackSizer::from_config(config, blob_type, total_size);
Ok(Self {
@ -111,12 +114,13 @@ impl<BE: DecryptWriteBackend> Packer<BE> {
})
}
pub async fn finalize(&mut self) -> Result<()> {
self.save().await?;
self.file_writer.finalize().await
pub fn finalize(mut self) -> Result<()> {
self.save()?;
self.file_writer.finalize()?;
Ok(())
}
pub async fn write_data(&mut self, data: &[u8]) -> Result<u32> {
pub fn write_data(&mut self, data: &[u8]) -> Result<u32> {
self.hasher.update(data);
let len = data.len().try_into()?;
self.file.extend_from_slice(data);
@ -125,25 +129,20 @@ impl<BE: DecryptWriteBackend> Packer<BE> {
}
// adds the blob to the packfile; returns the actually added size
pub async fn add(&mut self, data: &[u8], id: &Id) -> Result<u64> {
pub fn add(&mut self, data: &[u8], id: &Id) -> Result<u64> {
// compute size limit based on total size and size bounds
let size_limit = self.pack_sizer.pack_size();
self.add_with_sizelimit(data, id, size_limit).await
self.add_with_sizelimit(data, id, size_limit)
}
// adds the blob to the packfile; returns the actually added size
pub async fn add_with_sizelimit(
&mut self,
data: &[u8],
id: &Id,
size_limit: u32,
) -> Result<u64> {
pub fn add_with_sizelimit(&mut self, data: &[u8], id: &Id, size_limit: u32) -> Result<u64> {
// only add if this blob is not present
if self.has(id) {
return Ok(0);
}
{
let indexer = self.indexer.read().await;
let indexer = self.indexer.read().unwrap();
if indexer.has(id) {
return Ok(0);
}
@ -167,13 +166,12 @@ impl<BE: DecryptWriteBackend> Packer<BE> {
};
// add using current total_size as repo_size
self.add_raw(&data, id, uncompressed_length, size_limit)
.await?;
self.add_raw(&data, id, uncompressed_length, size_limit)?;
Ok(data.len().try_into()?)
}
// adds the already compressed/encrypted blob to the packfile without any check
pub async fn add_raw(
pub fn add_raw(
&mut self,
data: &[u8],
id: &Id,
@ -181,7 +179,7 @@ impl<BE: DecryptWriteBackend> Packer<BE> {
size_limit: u32,
) -> Result<()> {
let offset = self.size;
let len = self.write_data(data).await?;
let len = self.write_data(data)?;
self.index
.add(*id, self.blob_type, offset, len, uncompressed_length);
self.count += 1;
@ -190,7 +188,7 @@ impl<BE: DecryptWriteBackend> Packer<BE> {
if self.count >= MAX_COUNT || self.size >= size_limit || self.created.elapsed()? >= MAX_AGE
{
self.pack_sizer.add_size(self.index.pack_size());
self.save().await?;
self.save()?;
self.size = 0;
self.count = 0;
self.created = SystemTime::now();
@ -200,7 +198,7 @@ impl<BE: DecryptWriteBackend> Packer<BE> {
}
/// writes header and length of header to packfile
pub async fn write_header(&mut self) -> Result<()> {
pub fn write_header(&mut self) -> Result<()> {
// comput the pack header
let data = PackHeaderRef::from_index_pack(&self.index).to_binary()?;
@ -212,21 +210,20 @@ impl<BE: DecryptWriteBackend> Packer<BE> {
.map_err(|_| anyhow!("crypto error"))?;
let headerlen = data.len().try_into()?;
self.write_data(&data).await?;
self.write_data(&data)?;
// finally write length of header unencrypted to pack file
self.write_data(&PackHeaderLength::from_u32(headerlen).to_binary()?)
.await?;
self.write_data(&PackHeaderLength::from_u32(headerlen).to_binary()?)?;
Ok(())
}
pub async fn save(&mut self) -> Result<()> {
pub fn save(&mut self) -> Result<()> {
if self.size == 0 {
return Ok(());
}
self.write_header().await?;
self.write_header()?;
// compute id of packfile
let id = self.hasher.finalize();
@ -235,7 +232,7 @@ impl<BE: DecryptWriteBackend> Packer<BE> {
// write file to backend
let index = std::mem::take(&mut self.index);
let file = std::mem::replace(&mut self.file, BytesMut::new());
self.file_writer.add(index, file.into(), id).await?;
self.file_writer.send((file.into(), id, index))?;
Ok(())
}
@ -245,36 +242,69 @@ impl<BE: DecryptWriteBackend> Packer<BE> {
}
}
struct FileWriter<BE: DecryptWriteBackend> {
future: Option<JoinHandle<Result<()>>>,
#[derive(Clone)]
struct FileWriterHandle<BE: DecryptWriteBackend> {
be: BE,
indexer: SharedIndexer<BE>,
cacheable: bool,
}
impl<BE: DecryptWriteBackend> FileWriter<BE> {
async fn add(&mut self, mut index: IndexPack, file: Bytes, id: Id) -> Result<()> {
let be = self.be.clone();
let indexer = self.indexer.clone();
let cacheable = self.cacheable;
let new_future = spawn(async move {
be.write_bytes(FileType::Pack, &id, cacheable, file)?;
index.time = Some(Local::now());
indexer.write().await.add(index)?;
Ok(())
impl<BE: DecryptWriteBackend> ActorHandle<(Bytes, Id, IndexPack)> for FileWriterHandle<BE> {
fn process(&self, load: (Bytes, Id, IndexPack)) -> Result<()> {
let (file, id, mut index) = load;
self.be
.write_bytes(FileType::Pack, &id, self.cacheable, file)?;
index.time = Some(Local::now());
self.indexer.write().unwrap().add(index)?;
Ok(())
}
}
pub trait ActorHandle<T>: Clone + Send + 'static {
fn process(&self, load: T) -> Result<()>;
}
pub struct Actor<T> {
sender: Sender<T>,
finish: Receiver<Result<()>>,
}
impl<T: Send + Sync + 'static> Actor<T> {
pub fn new(fwh: impl ActorHandle<T>, queue_len: usize, par: usize) -> Self {
let (tx, rx) = bounded(queue_len);
let (finish_tx, finish_rx) = bounded::<Result<()>>(0);
(0..par).for_each(|_| {
let rx = rx.clone();
let finish_tx = finish_tx.clone();
let fwh = fwh.clone();
std::thread::spawn(move || {
let mut status = Ok(());
for load in rx {
// only keep processing if there was no error
if status.is_ok() {
status = fwh.process(load);
}
}
let _ = finish_tx.send(status);
});
});
if let Some(fut) = self.future.replace(new_future) {
fut.await??;
Self {
sender: tx,
finish: finish_rx,
}
}
pub fn send(&self, load: T) -> Result<()> {
self.sender.send(load)?;
Ok(())
}
async fn finalize(&mut self) -> Result<()> {
if let Some(fut) = self.future.take() {
return fut.await?;
}
Ok(())
pub fn finalize(self) -> Result<()> {
// cancel channel
drop(self.sender);
// wait for items in channel to be processed
self.finish.recv().unwrap()
}
}
@ -301,7 +331,7 @@ impl<BE: DecryptFullBackend> Repacker<BE> {
})
}
pub async fn add_fast(&mut self, pack_id: &Id, blob: &IndexBlob) -> Result<()> {
pub fn add_fast(&mut self, pack_id: &Id, blob: &IndexBlob) -> Result<()> {
let data = self.be.read_partial(
FileType::Pack,
pack_id,
@ -310,12 +340,11 @@ impl<BE: DecryptFullBackend> Repacker<BE> {
blob.length,
)?;
self.packer
.add_raw(&data, &blob.id, blob.uncompressed_length, self.size_limit)
.await?;
.add_raw(&data, &blob.id, blob.uncompressed_length, self.size_limit)?;
Ok(())
}
pub async fn add(&mut self, pack_id: &Id, blob: &IndexBlob) -> Result<()> {
pub fn add(&mut self, pack_id: &Id, blob: &IndexBlob) -> Result<()> {
let data = self.be.read_encrypted_partial(
FileType::Pack,
pack_id,
@ -325,12 +354,11 @@ impl<BE: DecryptFullBackend> Repacker<BE> {
blob.uncompressed_length,
)?;
self.packer
.add_with_sizelimit(&data, &blob.id, self.size_limit)
.await?;
.add_with_sizelimit(&data, &blob.id, self.size_limit)?;
Ok(())
}
pub async fn finalize(&mut self) -> Result<()> {
self.packer.finalize().await
pub fn finalize(self) -> Result<()> {
self.packer.finalize()
}
}

View File

@ -173,7 +173,7 @@ pub(super) async fn execute(
wait(opts.warm_up_wait).await;
if !opts.dry_run {
pruner.do_prune(be, opts, config).await?;
pruner.do_prune(be, opts, config)?;
}
Ok(())
}
@ -828,7 +828,7 @@ impl Pruner {
.collect()
}
async fn do_prune(
fn do_prune(
mut self,
be: &impl DecryptFullBackend,
opts: Opts,
@ -888,7 +888,7 @@ impl Pruner {
time: Some(Local::now()),
blobs: Vec::new(),
};
indexer.write().await.add_remove(pack)?;
indexer.write().unwrap().add_remove(pack)?;
}
}
}
@ -925,7 +925,7 @@ impl Pruner {
PackToDo::Keep => {
// keep pack: add to new index
let pack = pack.into_index_pack();
indexer.write().await.add(pack)?;
indexer.write().unwrap().add(pack)?;
}
PackToDo::Repack => {
// TODO: repack in parallel
@ -940,9 +940,9 @@ impl Pruner {
BlobType::Tree => &mut tree_repacker,
};
if opts.fast_repack {
repacker.add_fast(&pack.id, blob).await?;
repacker.add_fast(&pack.id, blob)?;
} else {
repacker.add(&pack.id, blob).await?;
repacker.add(&pack.id, blob)?;
}
p.inc(blob.length as u64);
}
@ -951,7 +951,7 @@ impl Pruner {
} else {
// mark pack for removal
let pack = pack.into_index_pack_with_time(self.time);
indexer.write().await.add_remove(pack)?;
indexer.write().unwrap().add_remove(pack)?;
}
}
PackToDo::MarkDelete => {
@ -960,7 +960,7 @@ impl Pruner {
} else {
// mark pack for removal
let pack = pack.into_index_pack_with_time(self.time);
indexer.write().await.add_remove(pack)?;
indexer.write().unwrap().add_remove(pack)?;
}
}
PackToDo::KeepMarked => {
@ -969,22 +969,22 @@ impl Pruner {
} else {
// keep pack: add to new index
let pack = pack.into_index_pack();
indexer.write().await.add_remove(pack)?;
indexer.write().unwrap().add_remove(pack)?;
}
}
PackToDo::Recover => {
// recover pack: add to new index in section packs
let pack = pack.into_index_pack_with_time(self.time);
indexer.write().await.add(pack)?;
indexer.write().unwrap().add(pack)?;
}
PackToDo::Delete => delete_pack(pack),
}
}
indexes_remove.push(index.id);
}
tree_repacker.finalize().await?;
data_repacker.finalize().await?;
indexer.write().await.finalize()?;
tree_repacker.finalize()?;
data_repacker.finalize()?;
indexer.write().unwrap().finalize()?;
p.finish();
if !data_packs_remove.is_empty() {

View File

@ -1,7 +1,6 @@
use std::collections::{HashMap, HashSet};
use anyhow::Result;
use async_recursion::async_recursion;
use clap::{AppSettings, Parser, Subcommand};
use log::*;
@ -89,7 +88,7 @@ pub(super) async fn execute(
) -> Result<()> {
match opts.command {
Command::Index(opt) => repair_index(be, opt).await,
Command::Snapshots(opt) => repair_snaps(be, opt, config_file, config).await,
Command::Snapshots(opt) => repair_snaps(be, opt, config_file, config),
}
}
@ -191,17 +190,17 @@ async fn repair_index(be: &impl DecryptFullBackend, opts: IndexOpts) -> Result<(
pack.set_id(id);
pack.blobs = PackHeader::from_file(be, id, size_hint, packsize)?.into_blobs();
if !opts.dry_run {
indexer.write().await.add_with(pack, to_delete)?;
indexer.write().unwrap().add_with(pack, to_delete)?;
}
p.inc(1);
}
indexer.write().await.finalize()?;
indexer.write().unwrap().finalize()?;
p.finish();
Ok(())
}
async fn repair_snaps(
fn repair_snaps(
be: &impl DecryptFullBackend,
mut opts: SnapOpts,
config_file: RusticConfig,
@ -238,9 +237,7 @@ async fn repair_snaps(
&mut replaced,
&mut seen,
&opts,
)
.await?
{
)? {
(Changed::None, _) => {
info!("snapshot {snap_id} is ok.");
}
@ -267,8 +264,8 @@ async fn repair_snaps(
}
if !opts.dry_run {
packer.finalize().await?;
indexer.write().await.finalize()?;
packer.finalize()?;
indexer.write().unwrap().finalize()?;
}
if opts.delete {
@ -294,8 +291,7 @@ enum Changed {
None,
}
#[async_recursion]
async fn repair_tree<BE: DecryptWriteBackend>(
fn repair_tree<BE: DecryptWriteBackend>(
be: &impl IndexedBackend,
packer: &mut Packer<BE>,
id: Option<Id>,
@ -352,7 +348,7 @@ async fn repair_tree<BE: DecryptWriteBackend>(
}
NodeType::Dir {} => {
let (c, tree_id) =
repair_tree(be, packer, node.subtree, replaced, seen, opts).await?;
repair_tree(be, packer, node.subtree, replaced, seen, opts)?;
match c {
Changed::None => {}
Changed::This => {
@ -385,7 +381,7 @@ async fn repair_tree<BE: DecryptWriteBackend>(
// the tree has been changed => save it
let (chunk, new_id) = tree.serialize()?;
if !be.has_tree(&new_id) && !opts.dry_run {
packer.add(&chunk, &new_id).await?;
packer.add(&chunk, &new_id)?;
}
if let Some(id) = id {
replaced.insert(id, (c, new_id));

View File

@ -1,9 +1,9 @@
use std::collections::HashSet;
use std::sync::Arc;
use std::sync::RwLock;
use std::time::{Duration, SystemTime};
use anyhow::Result;
use tokio::sync::RwLock;
use crate::backend::DecryptWriteBackend;
use crate::id::Id;