diff --git a/Cargo.lock b/Cargo.lock index 8d9ad16..add57ba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -462,6 +462,26 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "enum-map" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ddfe61e8040145222887d0d32a939c70c8cae681490d72fb868305e9b40ced8" +dependencies = [ + "enum-map-derive", +] + +[[package]] +name = "enum-map-derive" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00d1c54e25a57236a790ecf051c2befbb57740c9b86c4273eac378ba84d620d6" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "fastrand" version = "1.7.0" @@ -846,6 +866,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "integer-sqrt" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "276ec31bcb4a9ee45f58bec6f9ec700ae4cf4f4f8f2fa7e06cb406bd5ffdd770" +dependencies = [ + "num-traits", +] + [[package]] name = "ipnet" version = "2.5.0" @@ -1366,6 +1395,8 @@ dependencies = [ "derive-getters", "derive_more", "dirs 4.0.0", + "enum-map", + "enum-map-derive", "filetime", "futures", "gethostname", @@ -1373,6 +1404,7 @@ dependencies = [ "humantime", "ignore", "indicatif", + "integer-sqrt", "itertools", "lazy_static", "nix", diff --git a/Cargo.toml b/Cargo.toml index fa61a31..ef0aa83 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,8 +34,9 @@ aes256ctr_poly1305aes = "0.1" sha2 = "0.10" rand = "0.8" scrypt = { version = "0.10", default-features = false } -# chunker +# chunker / packer cdc = "0.1" +integer-sqrt = "0.1" # serialization base64 = "0.13" binrw = "0.9" @@ -47,6 +48,8 @@ serde-aux = "3" chrono = { version = "0.4", features = ["serde"] } tempfile = "3" zstd = "0.11" +enum-map = "2" +enum-map-derive = "0.9" # local backend walkdir = "2" ignore = "0.4" diff --git a/src/archiver/archiver_impl.rs b/src/archiver/archiver_impl.rs index f2541a2..755d820 100644 --- a/src/archiver/archiver_impl.rs +++ b/src/archiver/archiver_impl.rs @@ -15,7 +15,7 @@ use crate::chunker::ChunkIter; use crate::crypto::hash; use crate::id::Id; use crate::index::{IndexedBackend, Indexer, SharedIndexer}; -use crate::repo::{SnapshotFile, SnapshotSummary}; +use crate::repo::{ConfigFile, SnapshotFile, SnapshotSummary}; use super::{Parent, ParentResult}; @@ -38,23 +38,37 @@ impl Archiver { pub fn new( be: BE, index: I, - poly: u64, + config: &ConfigFile, parent: Parent, mut snap: SnapshotFile, - zstd: Option, ) -> Result { let indexer = Indexer::new(be.clone()).into_shared(); let mut summary = snap.summary.take().unwrap(); summary.backup_start = Local::now(); + let poly = config.poly()?; + let data_packer = Packer::new( + be.clone(), + BlobType::Data, + indexer.clone(), + config, + index.total_size(&BlobType::Data), + )?; + let tree_packer = Packer::new( + be.clone(), + BlobType::Tree, + indexer.clone(), + config, + index.total_size(&BlobType::Tree), + )?; Ok(Self { path: PathBuf::from("/"), tree: Tree::new(), parent, stack: Vec::new(), index, - data_packer: Packer::new(be.clone(), BlobType::Data, indexer.clone(), zstd)?, - tree_packer: Packer::new(be.clone(), BlobType::Tree, indexer.clone(), zstd)?, + data_packer, + tree_packer, be, poly, indexer, diff --git a/src/blob/mod.rs b/src/blob/mod.rs index 7228bdb..e581e00 100644 --- a/src/blob/mod.rs +++ b/src/blob/mod.rs @@ -5,11 +5,14 @@ pub use packer::*; pub use tree::*; use derive_more::Constructor; +use enum_map::{Enum, EnumMap}; use serde::{Deserialize, Serialize}; use crate::id::Id; -#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] +#[derive( + Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, Enum, +)] pub enum BlobType { #[serde(rename = "tree")] Tree, @@ -26,6 +29,8 @@ impl BlobType { } } +pub type BlobTypeMap = EnumMap; + #[derive(Debug, PartialEq, Clone, Constructor)] pub struct Blob { tpe: BlobType, diff --git a/src/blob/packer.rs b/src/blob/packer.rs index 8ff7fb6..90b7b33 100644 --- a/src/blob/packer.rs +++ b/src/blob/packer.rs @@ -1,3 +1,4 @@ +use integer_sqrt::IntegerSquareRoot; use std::fs::File; use std::io::{Seek, SeekFrom, Write}; use std::num::NonZeroU32; @@ -11,18 +12,44 @@ use tokio::{spawn, task::JoinHandle}; use zstd::encode_all; use super::BlobType; -use crate::backend::{DecryptWriteBackend, FileType}; +use crate::backend::{DecryptFullBackend, DecryptWriteBackend, FileType}; use crate::crypto::{CryptoKey, Hasher}; use crate::id::Id; use crate::index::SharedIndexer; -use crate::repo::IndexPack; +use crate::repo::{ConfigFile, IndexBlob, IndexPack}; const KB: u32 = 1024; const MB: u32 = 1024 * KB; -const MAX_SIZE: u32 = 4 * MB; +// the absolute maximum size of a pack: including headers it should not exceed 4 GB +const MAX_SIZE: u32 = 4076 * MB; const MAX_COUNT: u32 = 10_000; const MAX_AGE: Duration = Duration::from_secs(300); +struct PackSizer { + default_size: u32, + grow_factor: u32, + current_size: u64, +} + +impl PackSizer { + pub fn from_config(config: &ConfigFile, blob_type: BlobType, current_size: u64) -> Self { + let (default_size, grow_factor) = config.packsize(blob_type); + Self { + default_size, + grow_factor, + current_size, + } + } + + pub fn pack_size(&self) -> u32 { + (self.current_size.integer_sqrt() as u32 * self.grow_factor + self.default_size) + .min(MAX_SIZE) + } + + fn add_size(&mut self, added: u32) { + self.current_size += added as u64; + } +} pub struct Packer { be: BE, blob_type: BlobType, @@ -35,6 +62,7 @@ pub struct Packer { hasher: Hasher, file_writer: FileWriter, zstd: Option, + pack_sizer: PackSizer, } impl Packer { @@ -42,7 +70,8 @@ impl Packer { be: BE, blob_type: BlobType, indexer: SharedIndexer, - zstd: Option, + config: &ConfigFile, + total_size: u64, ) -> Result { let file_writer = FileWriter { future: None, @@ -50,6 +79,8 @@ impl Packer { indexer: indexer.clone(), cacheable: blob_type.is_cacheable(), }; + let zstd = config.zstd()?; + let pack_sizer = PackSizer::from_config(config, blob_type, total_size); Ok(Self { be, blob_type, @@ -62,6 +93,7 @@ impl Packer { hasher: Hasher::new(), file_writer, zstd, + pack_sizer, }) } @@ -79,6 +111,18 @@ impl Packer { // adds the blob to the packfile; returns the actually added size pub async fn add(&mut self, data: &[u8], id: &Id) -> Result { + // compute size limit based on total size and size bounds + let size_limit = self.pack_sizer.pack_size(); + self.add_with_sizelimit(data, id, size_limit).await + } + + // adds the blob to the packfile; returns the actually added size + pub async fn add_with_sizelimit( + &mut self, + data: &[u8], + id: &Id, + size_limit: u32, + ) -> Result { // only add if this blob is not present if self.has(id) { return Ok(0); @@ -107,7 +151,9 @@ impl Packer { ), }; - self.add_raw(&data, id, uncompressed_length).await?; + // add using current total_size as repo_size + self.add_raw(&data, id, uncompressed_length, size_limit) + .await?; Ok(data.len().try_into()?) } @@ -117,6 +163,7 @@ impl Packer { data: &[u8], id: &Id, uncompressed_length: Option, + size_limit: u32, ) -> Result<()> { let offset = self.size; let len = self.write_data(data).await?; @@ -125,7 +172,9 @@ impl Packer { self.count += 1; // check if PackFile needs to be saved - if self.count >= MAX_COUNT || self.size >= MAX_SIZE || self.created.elapsed()? >= MAX_AGE { + if self.count >= MAX_COUNT || self.size >= size_limit || self.created.elapsed()? >= MAX_AGE + { + self.pack_sizer.add_size(self.index.pack_size()); self.save().await?; self.size = 0; self.count = 0; @@ -260,3 +309,65 @@ impl FileWriter { Ok(()) } } + +pub struct Repacker { + be: BE, + packer: Packer, + size_limit: u32, +} + +impl Repacker { + pub fn new( + be: BE, + blob_type: BlobType, + indexer: SharedIndexer, + config: &ConfigFile, + total_size: u64, + ) -> Result { + let packer = Packer::new(be.clone(), blob_type, indexer, config, total_size)?; + let size_limit = packer.pack_sizer.pack_size(); + Ok(Self { + be, + packer, + size_limit, + }) + } + + pub async fn add_fast(&mut self, pack_id: &Id, blob: &IndexBlob) -> Result<()> { + let data = self + .be + .read_partial( + FileType::Pack, + pack_id, + blob.tpe.is_cacheable(), + blob.offset, + blob.length, + ) + .await?; + self.packer + .add_raw(&data, &blob.id, blob.uncompressed_length, self.size_limit) + .await?; + Ok(()) + } + + pub async fn add(&mut self, pack_id: &Id, blob: &IndexBlob) -> Result<()> { + let data = self + .be + .read_encrypted_partial( + FileType::Pack, + pack_id, + blob.tpe.is_cacheable(), + blob.offset, + blob.length, + ) + .await?; + self.packer + .add_with_sizelimit(&data, &blob.id, self.size_limit) + .await?; + Ok(()) + } + + pub async fn finalize(&mut self) -> Result<()> { + self.packer.finalize().await + } +} diff --git a/src/commands/backup.rs b/src/commands/backup.rs index b602479..8a80088 100644 --- a/src/commands/backup.rs +++ b/src/commands/backup.rs @@ -56,7 +56,6 @@ pub(super) async fn execute( command: String, ) -> Result<()> { let time = Local::now(); - let poly = config.poly()?; let zstd = config.zstd()?; let mut be = DryRunBackend::new(be.clone(), opts.dry_run); be.set_zstd(zstd); @@ -129,9 +128,8 @@ pub(super) async fn execute( } else { 0 }; - v1!("starting backup..."); - let mut archiver = Archiver::new(be, index, poly, parent, snap, zstd)?; + let mut archiver = Archiver::new(be, index, &config, parent, snap)?; let p = progress_bytes(); p.set_length(size); for item in src { diff --git a/src/commands/config.rs b/src/commands/config.rs index 8e63758..14ab687 100644 --- a/src/commands/config.rs +++ b/src/commands/config.rs @@ -1,4 +1,5 @@ use anyhow::{bail, Result}; +use bytesize::ByteSize; use clap::Parser; use crate::backend::DecryptFullBackend; @@ -36,6 +37,30 @@ pub(super) struct ConfigOpts { /// set repository version #[clap(long, value_name = "VERSION")] pub set_version: Option, + + /// Set default packsize for tree packs. rustic tries to always produce packs greater than this value. + /// Note that for large repos, this value is grown by the grown factor. + /// Defaults to 4 MiB if not set. + #[clap(long, value_name = "SIZE")] + pub set_treepack_size: Option, + + /// Set grow factor for tree packs. The default packsize grows by the square root of the reposize + /// multiplied with this factor. This means 32 kiB times this factor per square root of reposize in GiB. + /// Defaults to 32 (= 1MB per sqare root of reposize in GiB) if not set. + #[clap(long, value_name = "FACTOR")] + pub set_treepack_growfactor: Option, + + /// Set default packsize for data packs. rustic tries to always produce packs greater than this value. + /// Note that for large repos, this value is grown by the grown factor. + /// Defaults to 32 MiB if not set. + #[clap(long, value_name = "SIZE")] + pub set_datapack_size: Option, + + /// set grow factor for data packs. The default packsize grows by the square root of the reposize + /// multiplied with this factor. This means 32 kiB times this factor per square root of reposize in GiB. + /// Defaults to 32 (= 1MB per sqare root of reposize in GiB) if not set. + #[clap(long, value_name = "FACTOR")] + pub set_datapack_growfactor: Option, } impl ConfigOpts { @@ -71,6 +96,19 @@ impl ConfigOpts { config.compression = Some(compression); } + if let Some(size) = self.set_treepack_size { + config.treepack_size = Some(size.as_u64().try_into()?); + } + if let Some(factor) = self.set_treepack_growfactor { + config.treepack_growfactor = Some(factor); + } + if let Some(size) = self.set_datapack_size { + config.datapack_size = Some(size.as_u64().try_into()?); + } + if let Some(factor) = self.set_treepack_growfactor { + config.datapack_growfactor = Some(factor); + } + Ok(()) } } diff --git a/src/commands/prune.rs b/src/commands/prune.rs index ed32fc6..9981c3d 100644 --- a/src/commands/prune.rs +++ b/src/commands/prune.rs @@ -6,12 +6,13 @@ use anyhow::{anyhow, bail, Result}; use bytesize::ByteSize; use chrono::{DateTime, Duration, Local}; use clap::Parser; +use derive_more::Add; use futures::{future, TryStreamExt}; use vlog::*; use super::{bytes, progress_counter}; use crate::backend::{DecryptFullBackend, DecryptReadBackend, FileType}; -use crate::blob::{BlobType, NodeType, Packer, TreeStreamerOnce}; +use crate::blob::{BlobType, BlobTypeMap, NodeType, Repacker, TreeStreamerOnce}; use crate::id::Id; use crate::index::{IndexBackend, IndexCollector, IndexType, IndexedBackend, Indexer}; use crate::repo::{ConfigFile, IndexBlob, IndexFile, IndexPack, SnapshotFile}; @@ -158,14 +159,13 @@ struct PackStats { repack: u64, keep: u64, } -#[derive(Default)] +#[derive(Default, Clone, Copy, Add)] struct SizeStats { used: u64, unused: u64, remove: u64, repack: u64, repackrm: u64, - unref: u64, } impl SizeStats { @@ -185,11 +185,21 @@ struct PruneStats { packs_to_delete: DeleteStats, size_to_delete: DeleteStats, packs: PackStats, - blobs: SizeStats, - size: SizeStats, + blobs: BlobTypeMap, + size: BlobTypeMap, + size_unref: u64, index_files: u64, } +impl PruneStats { + fn total_blobs(&self) -> SizeStats { + self.blobs[BlobType::Tree] + self.blobs[BlobType::Data] + } + fn total_size(&self) -> SizeStats { + self.size[BlobType::Tree] + self.size[BlobType::Data] + } +} + #[derive(Debug)] struct PruneIndex { id: Id, @@ -265,32 +275,33 @@ impl PrunePack { } fn set_todo(&mut self, todo: PackToDo, pi: &PackInfo, stats: &mut PruneStats) { + let tpe = self.blob_type; match todo { PackToDo::Undecided => panic!("not possible"), PackToDo::Keep => { - stats.blobs.used += pi.used_blobs as u64; - stats.blobs.unused += pi.unused_blobs as u64; - stats.size.used += pi.used_size as u64; - stats.size.unused += pi.unused_size as u64; + stats.blobs[tpe].used += pi.used_blobs as u64; + stats.blobs[tpe].unused += pi.unused_blobs as u64; + stats.size[tpe].used += pi.used_size as u64; + stats.size[tpe].unused += pi.unused_size as u64; stats.packs.keep += 1; } PackToDo::Repack => { - stats.blobs.used += pi.used_blobs as u64; - stats.blobs.unused += pi.unused_blobs as u64; - stats.size.used += pi.used_size as u64; - stats.size.unused += pi.unused_size as u64; + stats.blobs[tpe].used += pi.used_blobs as u64; + stats.blobs[tpe].unused += pi.unused_blobs as u64; + stats.size[tpe].used += pi.used_size as u64; + stats.size[tpe].unused += pi.unused_size as u64; stats.packs.repack += 1; - stats.blobs.repack += (pi.unused_blobs + pi.used_blobs) as u64; - stats.blobs.repackrm += pi.unused_blobs as u64; - stats.size.repack += (pi.unused_size + pi.used_size) as u64; - stats.size.repackrm += pi.unused_size as u64; + stats.blobs[tpe].repack += (pi.unused_blobs + pi.used_blobs) as u64; + stats.blobs[tpe].repackrm += pi.unused_blobs as u64; + stats.size[tpe].repack += (pi.unused_size + pi.used_size) as u64; + stats.size[tpe].repackrm += pi.unused_size as u64; } PackToDo::MarkDelete => { - stats.blobs.unused += pi.unused_blobs as u64; - stats.size.unused += pi.unused_size as u64; - stats.blobs.remove += pi.unused_blobs as u64; - stats.size.remove += pi.unused_size as u64; + stats.blobs[tpe].unused += pi.unused_blobs as u64; + stats.size[tpe].unused += pi.unused_size as u64; + stats.blobs[tpe].remove += pi.unused_blobs as u64; + stats.size[tpe].remove += pi.unused_size as u64; } PackToDo::Recover => { stats.packs_to_delete.recover += 1; @@ -509,13 +520,13 @@ impl Pruner { // if percentag is given, we want to have // unused <= p/100 * size_after = p/100 * (size_used + unused) // which equals (1 - p/100) * unused <= p/100 * size_used - (false, LimitOption::Percentage(p)) => (p * self.stats.size.used) / (100 - p), + (false, LimitOption::Percentage(p)) => (p * self.stats.total_size().used) / (100 - p), }; let max_repack = match max_repack { LimitOption::Unlimited => u64::MAX, LimitOption::Size(size) => size.as_u64(), - LimitOption::Percentage(p) => (p * self.stats.size.total()), + LimitOption::Percentage(p) => (p * self.stats.total_size().total()), }; self.repack_candidates.sort_unstable_by_key(|rc| rc.0); @@ -523,10 +534,11 @@ impl Pruner { for (pi, index_num, pack_num) in std::mem::take(&mut self.repack_candidates) { let pack = &mut self.index_files[index_num].packs[pack_num]; - let repack_size_new = self.stats.size.repack + (pi.unused_size + pi.used_size) as u64; + let repack_size_new = + self.stats.total_size().repack + (pi.unused_size + pi.used_size) as u64; if repack_size_new >= max_repack || (pi.blob_type != BlobType::Tree - && self.stats.size.unused_after_prune() < max_unused) + && self.stats.total_size().unused_after_prune() < max_unused) { pack.set_todo(PackToDo::Keep, &pi, &mut self.stats); } else { @@ -577,7 +589,7 @@ impl Pruner { // all remaining packs in existing_packs are unreferenced packs for size in self.existing_packs.values() { - self.stats.size.unref += *size as u64; + self.stats.size_unref += *size as u64; } Ok(()) @@ -614,8 +626,8 @@ impl Pruner { fn print_stats(&self) { let pack_stat = &self.stats.packs; - let blob_stat = &self.stats.blobs; - let size_stat = &self.stats.size; + let blob_stat = self.stats.total_blobs(); + let size_stat = self.stats.total_size(); v2!( "used: {:>10} blobs, {:>10}", @@ -657,14 +669,14 @@ impl Pruner { v1!( "unindexed: {:>10} packs, ?? blobs, {:>10}", self.existing_packs.len(), - bytes(size_stat.unref) + bytes(self.stats.size_unref) ); } v1!( "total prune: {:>10} blobs, {:>10}", blob_stat.repackrm + blob_stat.remove, - bytes(size_stat.repackrm + size_stat.remove + size_stat.unref) + bytes(size_stat.repackrm + size_stat.remove + self.stats.size_unref) ); v1!( "remaining: {:>10} blobs, {:>10}", @@ -720,8 +732,38 @@ impl Pruner { be.set_zstd(zstd); let indexer = Indexer::new_unindexed(be.clone()).into_shared(); - let mut tree_packer = Packer::new(be.clone(), BlobType::Tree, indexer.clone(), zstd)?; - let mut data_packer = Packer::new(be.clone(), BlobType::Data, indexer.clone(), zstd)?; + + // Calculate an approximation of sizes after pruning. + // The size actually is: + // total_size_of_all_blobs + total_size_of_pack_headers + #packs * pack_overhead + // This is hard/impossible to compute because: + // - the size of blobs can change during repacking if compression is changed + // - the size of pack headers depends on wheter blobs are compressed or not + // - we don't know the number of packs generated by repacking + // So, we simply use the current size of the blobs and an estimation of the pack + // header size. + let tree_size_after_prune = self.stats.size[BlobType::Tree].total_after_prune() + + self.stats.blobs[BlobType::Tree].total_after_prune() + * IndexPack::HEADER_LEN_COMPRESSED as u64; + let data_size_after_prune = self.stats.size[BlobType::Data].total_after_prune() + + self.stats.blobs[BlobType::Data].total_after_prune() + * IndexPack::HEADER_LEN_COMPRESSED as u64; + + let mut tree_repacker = Repacker::new( + be.clone(), + BlobType::Tree, + indexer.clone(), + &config, + tree_size_after_prune, + )?; + + let mut data_repacker = Repacker::new( + be.clone(), + BlobType::Data, + indexer.clone(), + &config, + data_size_after_prune, + )?; // mark unreferenced packs for deletion if !self.existing_packs.is_empty() { @@ -780,38 +822,15 @@ impl Pruner { // don't save duplicate blobs continue; } + + let repacker = match blob.tpe { + BlobType::Data => &mut data_repacker, + BlobType::Tree => &mut tree_repacker, + }; if opts.fast_repack { - let data = be - .read_partial( - FileType::Pack, - &pack.id, - blob.tpe.is_cacheable(), - blob.offset, - blob.length, - ) - .await?; - match blob.tpe { - BlobType::Data => &mut data_packer, - BlobType::Tree => &mut tree_packer, - } - .add_raw(&data, &blob.id, blob.uncompressed_length) - .await?; + repacker.add_fast(&pack.id, blob).await?; } else { - let data = be - .read_encrypted_partial( - FileType::Pack, - &pack.id, - blob.tpe.is_cacheable(), - blob.offset, - blob.length, - ) - .await?; - match blob.tpe { - BlobType::Data => &mut data_packer, - BlobType::Tree => &mut tree_packer, - } - .add(&data, &blob.id) - .await?; + repacker.add(&pack.id, blob).await?; } } if opts.instant_delete { @@ -850,8 +869,8 @@ impl Pruner { } indexes_remove.push(index.id); } - tree_packer.finalize().await?; - data_packer.finalize().await?; + tree_repacker.finalize().await?; + data_repacker.finalize().await?; indexer.write().await.finalize().await?; if !data_packs_remove.is_empty() { diff --git a/src/commands/repoinfo.rs b/src/commands/repoinfo.rs index a1ceeaa..007e2e8 100644 --- a/src/commands/repoinfo.rs +++ b/src/commands/repoinfo.rs @@ -8,7 +8,7 @@ use super::{bytes, progress_counter}; use crate::backend::{DecryptReadBackend, ReadBackend, ALL_FILE_TYPES}; use crate::blob::BlobType; use crate::index::IndexEntry; -use crate::repo::IndexFile; +use crate::repo::{IndexFile, IndexPack}; #[derive(Parser)] pub(super) struct Opts; @@ -32,6 +32,10 @@ pub(super) async fn execute( count: u64, size: u64, data_size: u64, + pack_count: u64, + total_pack_size: u64, + min_pack_size: u64, + max_pack_size: u64, } impl Info { @@ -40,15 +44,34 @@ pub(super) async fn execute( self.size += *ie.length() as u64; self.data_size += ie.data_length() as u64; } + + fn add_pack(&mut self, ip: &IndexPack) { + self.pack_count += 1; + let size = ip.pack_size() as u64; + self.total_pack_size += size; + self.min_pack_size = self.min_pack_size.min(size); + self.max_pack_size = self.max_pack_size.max(size); + } } - let mut tree = Info::default(); - let mut data = Info::default(); + let mut tree = Info { + min_pack_size: u64::MAX, + ..Default::default() + }; + let mut data = Info { + min_pack_size: u64::MAX, + ..Default::default() + }; let mut tree_delete = Info::default(); let mut data_delete = Info::default(); while let Some((_, index)) = stream.try_next().await? { for pack in &index.packs { + match pack.blob_type() { + BlobType::Tree => tree.add_pack(pack), + BlobType::Data => data.add_pack(pack), + } + for blob in &pack.blobs { let ie = IndexEntry::from_index_blob(blob, pack.id); match blob.tpe { @@ -72,7 +95,7 @@ pub(super) async fn execute( let mut table = Table::new(); - table.add_row(row!["Tree",r->tree.count,r->bytes(tree.data_size), r->bytes(tree.size)]); + table.add_row(row!["Tree",r->tree.count,r->bytes(tree.data_size), r->bytes(tree.size) ]); table.add_row(row!["Data",r->data.count,r->bytes(data.data_size),r->bytes(data.size)]); if tree_delete.count > 0 { table.add_row(row!["Tree to delete",r->tree_delete.count,r->bytes(tree_delete.data_size),r->bytes(tree_delete.size)]); @@ -91,6 +114,18 @@ pub(super) async fn execute( println!(); table.printstd(); + let mut table = Table::new(); + table.add_row( + row!["Tree packs", r->tree.pack_count, r->bytes(tree.min_pack_size), r->bytes(tree.max_pack_size)], + ); + table.add_row( + row!["Data packs", r->data.pack_count, r->bytes(data.min_pack_size), r->bytes(data.max_pack_size)], + ); + table.set_titles(row![b->"Blob type", br->"Pack Count", br->"Minimum Size",br->"Maximum Size"]); + table.set_format(*format::consts::FORMAT_NO_BORDER_LINE_SEPARATOR); + println!(); + table.printstd(); + Ok(()) } diff --git a/src/index/binarysorted.rs b/src/index/binarysorted.rs index 92f9958..b966287 100644 --- a/src/index/binarysorted.rs +++ b/src/index/binarysorted.rs @@ -29,6 +29,8 @@ pub(crate) struct IndexCollector { packs: Vec, tree: Vec, data: SortedHashSetMap, + total_tree_size: u64, + total_data_size: u64, } impl IndexCollector { @@ -42,6 +44,8 @@ impl IndexCollector { packs: Vec::new(), tree: Vec::new(), data, + total_tree_size: 0, + total_data_size: 0, } } @@ -56,6 +60,8 @@ impl IndexCollector { packs: self.packs, tree: self.tree, data: self.data, + total_tree_size: self.total_tree_size, + total_data_size: self.total_data_size, } } } @@ -69,6 +75,12 @@ impl Extend for IndexCollector { let idx = self.packs.len(); self.packs.push(p.id); let len = p.blobs.len(); + let blob_type = p.blob_type(); + + match blob_type { + BlobType::Tree => self.total_tree_size += p.pack_size() as u64, + BlobType::Data => self.total_data_size += p.pack_size() as u64, + } match (p.blob_type(), &mut self.data) { (BlobType::Tree, _) => self.tree.reserve(len), @@ -100,6 +112,8 @@ pub struct Index { packs: Vec, tree: Vec, data: SortedHashSetMap, + total_tree_size: u64, + total_data_size: u64, } impl ReadIndex for Index { @@ -123,6 +137,13 @@ impl ReadIndex for Index { }) } + fn total_size(&self, tpe: &BlobType) -> u64 { + match tpe { + BlobType::Tree => self.total_tree_size, + BlobType::Data => self.total_data_size, + } + } + fn has(&self, tpe: &BlobType, id: &Id) -> bool { match (tpe, &self.data) { (BlobType::Tree, _) => self.tree.binary_search_by_key(id, |e| e.id).is_ok(), diff --git a/src/index/mod.rs b/src/index/mod.rs index d6c5069..a01780a 100644 --- a/src/index/mod.rs +++ b/src/index/mod.rs @@ -70,6 +70,7 @@ impl IndexEntry { #[delegatable_trait] pub trait ReadIndex { fn get_id(&self, tpe: &BlobType, id: &Id) -> Option; + fn total_size(&self, tpe: &BlobType) -> u64; fn get_tree(&self, id: &Id) -> Option { self.get_id(&BlobType::Tree, id) diff --git a/src/repo/configfile.rs b/src/repo/configfile.rs index 9e9d913..b18f094 100644 --- a/src/repo/configfile.rs +++ b/src/repo/configfile.rs @@ -2,6 +2,7 @@ use anyhow::{bail, Result}; use serde::{Deserialize, Serialize}; use crate::backend::{FileType, RepoFile}; +use crate::blob::BlobType; use crate::id::Id; #[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)] @@ -13,12 +14,29 @@ pub struct ConfigFile { pub is_hot: Option, #[serde(default, skip_serializing_if = "Option::is_none")] pub compression: Option, // note that Some(0) means no compression. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub treepack_size: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub treepack_growfactor: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub datapack_size: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub datapack_growfactor: Option, } impl RepoFile for ConfigFile { const TYPE: FileType = FileType::Config; } +const KB: u32 = 1024; +const MB: u32 = 1024 * KB; +// default pack size +const DEFAULT_TREE_SIZE: u32 = 4 * MB; +const DEFAULT_DATA_SIZE: u32 = 32 * MB; +// the default factor used for repo-size dependent pack size. +// 32 * sqrt(reposize in bytes) = 1 MB * sqrt(reposize in GB) +const DEFAULT_GROW_FACTOR: u32 = 32; + impl ConfigFile { pub fn new(version: u32, id: Id, poly: u64) -> Self { Self { @@ -27,6 +45,10 @@ impl ConfigFile { chunker_polynomial: format!("{:x}", poly), is_hot: None, compression: None, + treepack_size: None, + treepack_growfactor: None, + datapack_size: None, + datapack_growfactor: None, } } @@ -42,4 +64,17 @@ impl ConfigFile { _ => bail!("config version not supported!"), } } + + pub fn packsize(&self, blob: BlobType) -> (u32, u32) { + match blob { + BlobType::Tree => ( + self.treepack_size.unwrap_or(DEFAULT_TREE_SIZE), + self.treepack_growfactor.unwrap_or(DEFAULT_GROW_FACTOR), + ), + BlobType::Data => ( + self.datapack_size.unwrap_or(DEFAULT_DATA_SIZE), + self.datapack_growfactor.unwrap_or(DEFAULT_GROW_FACTOR), + ), + } + } } diff --git a/src/repo/indexfile.rs b/src/repo/indexfile.rs index 55c93f8..bbeb708 100644 --- a/src/repo/indexfile.rs +++ b/src/repo/indexfile.rs @@ -42,6 +42,15 @@ pub struct IndexPack { } impl IndexPack { + // 4 equals the size of blob::packer::PackHeaderLength + // 32 equals the size of the crypto overhead + pub const PACK_OVERHEAD: u32 = 4 + 32; + + // this equals the size of blob::packer::PackHeaderEntry + pub const HEADER_LEN: u32 = 37; + // this equals the size of blob::packer::PackHeaderEntryComp + pub const HEADER_LEN_COMPRESSED: u32 = 41; + pub fn set_id(&mut self, id: Id) { self.id = id; } @@ -66,16 +75,13 @@ impl IndexPack { // calculate the pack size from the contained blobs pub fn pack_size(&self) -> u32 { self.size.unwrap_or_else(|| { - self.blobs.iter().fold( - 4 + 32, // 4 + crypto overhead - |acc, blob| { - acc + blob.length - + match blob.uncompressed_length { - None => 37, // 37 = length of blob description for uncompressed blobs - Some(_) => 41, // 41 = length of blob description for compressed blobs - } - }, - ) + self.blobs.iter().fold(Self::PACK_OVERHEAD, |acc, blob| { + acc + blob.length + + match blob.uncompressed_length { + None => Self::HEADER_LEN, + Some(_) => Self::HEADER_LEN_COMPRESSED, + } + }) }) }