mirror of
https://github.com/rustic-rs/rustic.git
synced 2025-10-26 11:18:51 +00:00
Merge pull request #52 from rustic-rs/dynamic-packsize
Allow to customize pack size
This commit is contained in:
commit
96498846ec
32
Cargo.lock
generated
32
Cargo.lock
generated
@ -462,6 +462,26 @@ dependencies = [
|
||||
"cfg-if",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "enum-map"
|
||||
version = "2.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6ddfe61e8040145222887d0d32a939c70c8cae681490d72fb868305e9b40ced8"
|
||||
dependencies = [
|
||||
"enum-map-derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "enum-map-derive"
|
||||
version = "0.9.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "00d1c54e25a57236a790ecf051c2befbb57740c9b86c4273eac378ba84d620d6"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "fastrand"
|
||||
version = "1.7.0"
|
||||
@ -846,6 +866,15 @@ dependencies = [
|
||||
"cfg-if",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "integer-sqrt"
|
||||
version = "0.1.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "276ec31bcb4a9ee45f58bec6f9ec700ae4cf4f4f8f2fa7e06cb406bd5ffdd770"
|
||||
dependencies = [
|
||||
"num-traits",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ipnet"
|
||||
version = "2.5.0"
|
||||
@ -1366,6 +1395,8 @@ dependencies = [
|
||||
"derive-getters",
|
||||
"derive_more",
|
||||
"dirs 4.0.0",
|
||||
"enum-map",
|
||||
"enum-map-derive",
|
||||
"filetime",
|
||||
"futures",
|
||||
"gethostname",
|
||||
@ -1373,6 +1404,7 @@ dependencies = [
|
||||
"humantime",
|
||||
"ignore",
|
||||
"indicatif",
|
||||
"integer-sqrt",
|
||||
"itertools",
|
||||
"lazy_static",
|
||||
"nix",
|
||||
|
||||
@ -34,8 +34,9 @@ aes256ctr_poly1305aes = "0.1"
|
||||
sha2 = "0.10"
|
||||
rand = "0.8"
|
||||
scrypt = { version = "0.10", default-features = false }
|
||||
# chunker
|
||||
# chunker / packer
|
||||
cdc = "0.1"
|
||||
integer-sqrt = "0.1"
|
||||
# serialization
|
||||
base64 = "0.13"
|
||||
binrw = "0.9"
|
||||
@ -47,6 +48,8 @@ serde-aux = "3"
|
||||
chrono = { version = "0.4", features = ["serde"] }
|
||||
tempfile = "3"
|
||||
zstd = "0.11"
|
||||
enum-map = "2"
|
||||
enum-map-derive = "0.9"
|
||||
# local backend
|
||||
walkdir = "2"
|
||||
ignore = "0.4"
|
||||
|
||||
@ -15,7 +15,7 @@ use crate::chunker::ChunkIter;
|
||||
use crate::crypto::hash;
|
||||
use crate::id::Id;
|
||||
use crate::index::{IndexedBackend, Indexer, SharedIndexer};
|
||||
use crate::repo::{SnapshotFile, SnapshotSummary};
|
||||
use crate::repo::{ConfigFile, SnapshotFile, SnapshotSummary};
|
||||
|
||||
use super::{Parent, ParentResult};
|
||||
|
||||
@ -38,23 +38,37 @@ impl<BE: DecryptWriteBackend, I: IndexedBackend> Archiver<BE, I> {
|
||||
pub fn new(
|
||||
be: BE,
|
||||
index: I,
|
||||
poly: u64,
|
||||
config: &ConfigFile,
|
||||
parent: Parent<I>,
|
||||
mut snap: SnapshotFile,
|
||||
zstd: Option<i32>,
|
||||
) -> Result<Self> {
|
||||
let indexer = Indexer::new(be.clone()).into_shared();
|
||||
let mut summary = snap.summary.take().unwrap();
|
||||
summary.backup_start = Local::now();
|
||||
let poly = config.poly()?;
|
||||
|
||||
let data_packer = Packer::new(
|
||||
be.clone(),
|
||||
BlobType::Data,
|
||||
indexer.clone(),
|
||||
config,
|
||||
index.total_size(&BlobType::Data),
|
||||
)?;
|
||||
let tree_packer = Packer::new(
|
||||
be.clone(),
|
||||
BlobType::Tree,
|
||||
indexer.clone(),
|
||||
config,
|
||||
index.total_size(&BlobType::Tree),
|
||||
)?;
|
||||
Ok(Self {
|
||||
path: PathBuf::from("/"),
|
||||
tree: Tree::new(),
|
||||
parent,
|
||||
stack: Vec::new(),
|
||||
index,
|
||||
data_packer: Packer::new(be.clone(), BlobType::Data, indexer.clone(), zstd)?,
|
||||
tree_packer: Packer::new(be.clone(), BlobType::Tree, indexer.clone(), zstd)?,
|
||||
data_packer,
|
||||
tree_packer,
|
||||
be,
|
||||
poly,
|
||||
indexer,
|
||||
|
||||
@ -5,11 +5,14 @@ pub use packer::*;
|
||||
pub use tree::*;
|
||||
|
||||
use derive_more::Constructor;
|
||||
use enum_map::{Enum, EnumMap};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::id::Id;
|
||||
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
|
||||
#[derive(
|
||||
Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, Enum,
|
||||
)]
|
||||
pub enum BlobType {
|
||||
#[serde(rename = "tree")]
|
||||
Tree,
|
||||
@ -26,6 +29,8 @@ impl BlobType {
|
||||
}
|
||||
}
|
||||
|
||||
pub type BlobTypeMap<T> = EnumMap<BlobType, T>;
|
||||
|
||||
#[derive(Debug, PartialEq, Clone, Constructor)]
|
||||
pub struct Blob {
|
||||
tpe: BlobType,
|
||||
|
||||
@ -1,3 +1,4 @@
|
||||
use integer_sqrt::IntegerSquareRoot;
|
||||
use std::fs::File;
|
||||
use std::io::{Seek, SeekFrom, Write};
|
||||
use std::num::NonZeroU32;
|
||||
@ -11,18 +12,44 @@ use tokio::{spawn, task::JoinHandle};
|
||||
use zstd::encode_all;
|
||||
|
||||
use super::BlobType;
|
||||
use crate::backend::{DecryptWriteBackend, FileType};
|
||||
use crate::backend::{DecryptFullBackend, DecryptWriteBackend, FileType};
|
||||
use crate::crypto::{CryptoKey, Hasher};
|
||||
use crate::id::Id;
|
||||
use crate::index::SharedIndexer;
|
||||
use crate::repo::IndexPack;
|
||||
use crate::repo::{ConfigFile, IndexBlob, IndexPack};
|
||||
|
||||
const KB: u32 = 1024;
|
||||
const MB: u32 = 1024 * KB;
|
||||
const MAX_SIZE: u32 = 4 * MB;
|
||||
// the absolute maximum size of a pack: including headers it should not exceed 4 GB
|
||||
const MAX_SIZE: u32 = 4076 * MB;
|
||||
const MAX_COUNT: u32 = 10_000;
|
||||
const MAX_AGE: Duration = Duration::from_secs(300);
|
||||
|
||||
struct PackSizer {
|
||||
default_size: u32,
|
||||
grow_factor: u32,
|
||||
current_size: u64,
|
||||
}
|
||||
|
||||
impl PackSizer {
|
||||
pub fn from_config(config: &ConfigFile, blob_type: BlobType, current_size: u64) -> Self {
|
||||
let (default_size, grow_factor) = config.packsize(blob_type);
|
||||
Self {
|
||||
default_size,
|
||||
grow_factor,
|
||||
current_size,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn pack_size(&self) -> u32 {
|
||||
(self.current_size.integer_sqrt() as u32 * self.grow_factor + self.default_size)
|
||||
.min(MAX_SIZE)
|
||||
}
|
||||
|
||||
fn add_size(&mut self, added: u32) {
|
||||
self.current_size += added as u64;
|
||||
}
|
||||
}
|
||||
pub struct Packer<BE: DecryptWriteBackend> {
|
||||
be: BE,
|
||||
blob_type: BlobType,
|
||||
@ -35,6 +62,7 @@ pub struct Packer<BE: DecryptWriteBackend> {
|
||||
hasher: Hasher,
|
||||
file_writer: FileWriter<BE>,
|
||||
zstd: Option<i32>,
|
||||
pack_sizer: PackSizer,
|
||||
}
|
||||
|
||||
impl<BE: DecryptWriteBackend> Packer<BE> {
|
||||
@ -42,7 +70,8 @@ impl<BE: DecryptWriteBackend> Packer<BE> {
|
||||
be: BE,
|
||||
blob_type: BlobType,
|
||||
indexer: SharedIndexer<BE>,
|
||||
zstd: Option<i32>,
|
||||
config: &ConfigFile,
|
||||
total_size: u64,
|
||||
) -> Result<Self> {
|
||||
let file_writer = FileWriter {
|
||||
future: None,
|
||||
@ -50,6 +79,8 @@ impl<BE: DecryptWriteBackend> Packer<BE> {
|
||||
indexer: indexer.clone(),
|
||||
cacheable: blob_type.is_cacheable(),
|
||||
};
|
||||
let zstd = config.zstd()?;
|
||||
let pack_sizer = PackSizer::from_config(config, blob_type, total_size);
|
||||
Ok(Self {
|
||||
be,
|
||||
blob_type,
|
||||
@ -62,6 +93,7 @@ impl<BE: DecryptWriteBackend> Packer<BE> {
|
||||
hasher: Hasher::new(),
|
||||
file_writer,
|
||||
zstd,
|
||||
pack_sizer,
|
||||
})
|
||||
}
|
||||
|
||||
@ -79,6 +111,18 @@ impl<BE: DecryptWriteBackend> Packer<BE> {
|
||||
|
||||
// adds the blob to the packfile; returns the actually added size
|
||||
pub async fn add(&mut self, data: &[u8], id: &Id) -> Result<u64> {
|
||||
// compute size limit based on total size and size bounds
|
||||
let size_limit = self.pack_sizer.pack_size();
|
||||
self.add_with_sizelimit(data, id, size_limit).await
|
||||
}
|
||||
|
||||
// adds the blob to the packfile; returns the actually added size
|
||||
pub async fn add_with_sizelimit(
|
||||
&mut self,
|
||||
data: &[u8],
|
||||
id: &Id,
|
||||
size_limit: u32,
|
||||
) -> Result<u64> {
|
||||
// only add if this blob is not present
|
||||
if self.has(id) {
|
||||
return Ok(0);
|
||||
@ -107,7 +151,9 @@ impl<BE: DecryptWriteBackend> Packer<BE> {
|
||||
),
|
||||
};
|
||||
|
||||
self.add_raw(&data, id, uncompressed_length).await?;
|
||||
// add using current total_size as repo_size
|
||||
self.add_raw(&data, id, uncompressed_length, size_limit)
|
||||
.await?;
|
||||
Ok(data.len().try_into()?)
|
||||
}
|
||||
|
||||
@ -117,6 +163,7 @@ impl<BE: DecryptWriteBackend> Packer<BE> {
|
||||
data: &[u8],
|
||||
id: &Id,
|
||||
uncompressed_length: Option<NonZeroU32>,
|
||||
size_limit: u32,
|
||||
) -> Result<()> {
|
||||
let offset = self.size;
|
||||
let len = self.write_data(data).await?;
|
||||
@ -125,7 +172,9 @@ impl<BE: DecryptWriteBackend> Packer<BE> {
|
||||
self.count += 1;
|
||||
|
||||
// check if PackFile needs to be saved
|
||||
if self.count >= MAX_COUNT || self.size >= MAX_SIZE || self.created.elapsed()? >= MAX_AGE {
|
||||
if self.count >= MAX_COUNT || self.size >= size_limit || self.created.elapsed()? >= MAX_AGE
|
||||
{
|
||||
self.pack_sizer.add_size(self.index.pack_size());
|
||||
self.save().await?;
|
||||
self.size = 0;
|
||||
self.count = 0;
|
||||
@ -260,3 +309,65 @@ impl<BE: DecryptWriteBackend> FileWriter<BE> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Repacker<BE: DecryptFullBackend> {
|
||||
be: BE,
|
||||
packer: Packer<BE>,
|
||||
size_limit: u32,
|
||||
}
|
||||
|
||||
impl<BE: DecryptFullBackend> Repacker<BE> {
|
||||
pub fn new(
|
||||
be: BE,
|
||||
blob_type: BlobType,
|
||||
indexer: SharedIndexer<BE>,
|
||||
config: &ConfigFile,
|
||||
total_size: u64,
|
||||
) -> Result<Self> {
|
||||
let packer = Packer::new(be.clone(), blob_type, indexer, config, total_size)?;
|
||||
let size_limit = packer.pack_sizer.pack_size();
|
||||
Ok(Self {
|
||||
be,
|
||||
packer,
|
||||
size_limit,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn add_fast(&mut self, pack_id: &Id, blob: &IndexBlob) -> Result<()> {
|
||||
let data = self
|
||||
.be
|
||||
.read_partial(
|
||||
FileType::Pack,
|
||||
pack_id,
|
||||
blob.tpe.is_cacheable(),
|
||||
blob.offset,
|
||||
blob.length,
|
||||
)
|
||||
.await?;
|
||||
self.packer
|
||||
.add_raw(&data, &blob.id, blob.uncompressed_length, self.size_limit)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn add(&mut self, pack_id: &Id, blob: &IndexBlob) -> Result<()> {
|
||||
let data = self
|
||||
.be
|
||||
.read_encrypted_partial(
|
||||
FileType::Pack,
|
||||
pack_id,
|
||||
blob.tpe.is_cacheable(),
|
||||
blob.offset,
|
||||
blob.length,
|
||||
)
|
||||
.await?;
|
||||
self.packer
|
||||
.add_with_sizelimit(&data, &blob.id, self.size_limit)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn finalize(&mut self) -> Result<()> {
|
||||
self.packer.finalize().await
|
||||
}
|
||||
}
|
||||
|
||||
@ -56,7 +56,6 @@ pub(super) async fn execute(
|
||||
command: String,
|
||||
) -> Result<()> {
|
||||
let time = Local::now();
|
||||
let poly = config.poly()?;
|
||||
let zstd = config.zstd()?;
|
||||
let mut be = DryRunBackend::new(be.clone(), opts.dry_run);
|
||||
be.set_zstd(zstd);
|
||||
@ -129,9 +128,8 @@ pub(super) async fn execute(
|
||||
} else {
|
||||
0
|
||||
};
|
||||
|
||||
v1!("starting backup...");
|
||||
let mut archiver = Archiver::new(be, index, poly, parent, snap, zstd)?;
|
||||
let mut archiver = Archiver::new(be, index, &config, parent, snap)?;
|
||||
let p = progress_bytes();
|
||||
p.set_length(size);
|
||||
for item in src {
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
use anyhow::{bail, Result};
|
||||
use bytesize::ByteSize;
|
||||
use clap::Parser;
|
||||
|
||||
use crate::backend::DecryptFullBackend;
|
||||
@ -36,6 +37,30 @@ pub(super) struct ConfigOpts {
|
||||
/// set repository version
|
||||
#[clap(long, value_name = "VERSION")]
|
||||
pub set_version: Option<u32>,
|
||||
|
||||
/// Set default packsize for tree packs. rustic tries to always produce packs greater than this value.
|
||||
/// Note that for large repos, this value is grown by the grown factor.
|
||||
/// Defaults to 4 MiB if not set.
|
||||
#[clap(long, value_name = "SIZE")]
|
||||
pub set_treepack_size: Option<ByteSize>,
|
||||
|
||||
/// Set grow factor for tree packs. The default packsize grows by the square root of the reposize
|
||||
/// multiplied with this factor. This means 32 kiB times this factor per square root of reposize in GiB.
|
||||
/// Defaults to 32 (= 1MB per sqare root of reposize in GiB) if not set.
|
||||
#[clap(long, value_name = "FACTOR")]
|
||||
pub set_treepack_growfactor: Option<u32>,
|
||||
|
||||
/// Set default packsize for data packs. rustic tries to always produce packs greater than this value.
|
||||
/// Note that for large repos, this value is grown by the grown factor.
|
||||
/// Defaults to 32 MiB if not set.
|
||||
#[clap(long, value_name = "SIZE")]
|
||||
pub set_datapack_size: Option<ByteSize>,
|
||||
|
||||
/// set grow factor for data packs. The default packsize grows by the square root of the reposize
|
||||
/// multiplied with this factor. This means 32 kiB times this factor per square root of reposize in GiB.
|
||||
/// Defaults to 32 (= 1MB per sqare root of reposize in GiB) if not set.
|
||||
#[clap(long, value_name = "FACTOR")]
|
||||
pub set_datapack_growfactor: Option<u32>,
|
||||
}
|
||||
|
||||
impl ConfigOpts {
|
||||
@ -71,6 +96,19 @@ impl ConfigOpts {
|
||||
config.compression = Some(compression);
|
||||
}
|
||||
|
||||
if let Some(size) = self.set_treepack_size {
|
||||
config.treepack_size = Some(size.as_u64().try_into()?);
|
||||
}
|
||||
if let Some(factor) = self.set_treepack_growfactor {
|
||||
config.treepack_growfactor = Some(factor);
|
||||
}
|
||||
if let Some(size) = self.set_datapack_size {
|
||||
config.datapack_size = Some(size.as_u64().try_into()?);
|
||||
}
|
||||
if let Some(factor) = self.set_treepack_growfactor {
|
||||
config.datapack_growfactor = Some(factor);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@ -6,12 +6,13 @@ use anyhow::{anyhow, bail, Result};
|
||||
use bytesize::ByteSize;
|
||||
use chrono::{DateTime, Duration, Local};
|
||||
use clap::Parser;
|
||||
use derive_more::Add;
|
||||
use futures::{future, TryStreamExt};
|
||||
use vlog::*;
|
||||
|
||||
use super::{bytes, progress_counter};
|
||||
use crate::backend::{DecryptFullBackend, DecryptReadBackend, FileType};
|
||||
use crate::blob::{BlobType, NodeType, Packer, TreeStreamerOnce};
|
||||
use crate::blob::{BlobType, BlobTypeMap, NodeType, Repacker, TreeStreamerOnce};
|
||||
use crate::id::Id;
|
||||
use crate::index::{IndexBackend, IndexCollector, IndexType, IndexedBackend, Indexer};
|
||||
use crate::repo::{ConfigFile, IndexBlob, IndexFile, IndexPack, SnapshotFile};
|
||||
@ -158,14 +159,13 @@ struct PackStats {
|
||||
repack: u64,
|
||||
keep: u64,
|
||||
}
|
||||
#[derive(Default)]
|
||||
#[derive(Default, Clone, Copy, Add)]
|
||||
struct SizeStats {
|
||||
used: u64,
|
||||
unused: u64,
|
||||
remove: u64,
|
||||
repack: u64,
|
||||
repackrm: u64,
|
||||
unref: u64,
|
||||
}
|
||||
|
||||
impl SizeStats {
|
||||
@ -185,11 +185,21 @@ struct PruneStats {
|
||||
packs_to_delete: DeleteStats,
|
||||
size_to_delete: DeleteStats,
|
||||
packs: PackStats,
|
||||
blobs: SizeStats,
|
||||
size: SizeStats,
|
||||
blobs: BlobTypeMap<SizeStats>,
|
||||
size: BlobTypeMap<SizeStats>,
|
||||
size_unref: u64,
|
||||
index_files: u64,
|
||||
}
|
||||
|
||||
impl PruneStats {
|
||||
fn total_blobs(&self) -> SizeStats {
|
||||
self.blobs[BlobType::Tree] + self.blobs[BlobType::Data]
|
||||
}
|
||||
fn total_size(&self) -> SizeStats {
|
||||
self.size[BlobType::Tree] + self.size[BlobType::Data]
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct PruneIndex {
|
||||
id: Id,
|
||||
@ -265,32 +275,33 @@ impl PrunePack {
|
||||
}
|
||||
|
||||
fn set_todo(&mut self, todo: PackToDo, pi: &PackInfo, stats: &mut PruneStats) {
|
||||
let tpe = self.blob_type;
|
||||
match todo {
|
||||
PackToDo::Undecided => panic!("not possible"),
|
||||
PackToDo::Keep => {
|
||||
stats.blobs.used += pi.used_blobs as u64;
|
||||
stats.blobs.unused += pi.unused_blobs as u64;
|
||||
stats.size.used += pi.used_size as u64;
|
||||
stats.size.unused += pi.unused_size as u64;
|
||||
stats.blobs[tpe].used += pi.used_blobs as u64;
|
||||
stats.blobs[tpe].unused += pi.unused_blobs as u64;
|
||||
stats.size[tpe].used += pi.used_size as u64;
|
||||
stats.size[tpe].unused += pi.unused_size as u64;
|
||||
stats.packs.keep += 1;
|
||||
}
|
||||
PackToDo::Repack => {
|
||||
stats.blobs.used += pi.used_blobs as u64;
|
||||
stats.blobs.unused += pi.unused_blobs as u64;
|
||||
stats.size.used += pi.used_size as u64;
|
||||
stats.size.unused += pi.unused_size as u64;
|
||||
stats.blobs[tpe].used += pi.used_blobs as u64;
|
||||
stats.blobs[tpe].unused += pi.unused_blobs as u64;
|
||||
stats.size[tpe].used += pi.used_size as u64;
|
||||
stats.size[tpe].unused += pi.unused_size as u64;
|
||||
stats.packs.repack += 1;
|
||||
stats.blobs.repack += (pi.unused_blobs + pi.used_blobs) as u64;
|
||||
stats.blobs.repackrm += pi.unused_blobs as u64;
|
||||
stats.size.repack += (pi.unused_size + pi.used_size) as u64;
|
||||
stats.size.repackrm += pi.unused_size as u64;
|
||||
stats.blobs[tpe].repack += (pi.unused_blobs + pi.used_blobs) as u64;
|
||||
stats.blobs[tpe].repackrm += pi.unused_blobs as u64;
|
||||
stats.size[tpe].repack += (pi.unused_size + pi.used_size) as u64;
|
||||
stats.size[tpe].repackrm += pi.unused_size as u64;
|
||||
}
|
||||
|
||||
PackToDo::MarkDelete => {
|
||||
stats.blobs.unused += pi.unused_blobs as u64;
|
||||
stats.size.unused += pi.unused_size as u64;
|
||||
stats.blobs.remove += pi.unused_blobs as u64;
|
||||
stats.size.remove += pi.unused_size as u64;
|
||||
stats.blobs[tpe].unused += pi.unused_blobs as u64;
|
||||
stats.size[tpe].unused += pi.unused_size as u64;
|
||||
stats.blobs[tpe].remove += pi.unused_blobs as u64;
|
||||
stats.size[tpe].remove += pi.unused_size as u64;
|
||||
}
|
||||
PackToDo::Recover => {
|
||||
stats.packs_to_delete.recover += 1;
|
||||
@ -509,13 +520,13 @@ impl Pruner {
|
||||
// if percentag is given, we want to have
|
||||
// unused <= p/100 * size_after = p/100 * (size_used + unused)
|
||||
// which equals (1 - p/100) * unused <= p/100 * size_used
|
||||
(false, LimitOption::Percentage(p)) => (p * self.stats.size.used) / (100 - p),
|
||||
(false, LimitOption::Percentage(p)) => (p * self.stats.total_size().used) / (100 - p),
|
||||
};
|
||||
|
||||
let max_repack = match max_repack {
|
||||
LimitOption::Unlimited => u64::MAX,
|
||||
LimitOption::Size(size) => size.as_u64(),
|
||||
LimitOption::Percentage(p) => (p * self.stats.size.total()),
|
||||
LimitOption::Percentage(p) => (p * self.stats.total_size().total()),
|
||||
};
|
||||
|
||||
self.repack_candidates.sort_unstable_by_key(|rc| rc.0);
|
||||
@ -523,10 +534,11 @@ impl Pruner {
|
||||
for (pi, index_num, pack_num) in std::mem::take(&mut self.repack_candidates) {
|
||||
let pack = &mut self.index_files[index_num].packs[pack_num];
|
||||
|
||||
let repack_size_new = self.stats.size.repack + (pi.unused_size + pi.used_size) as u64;
|
||||
let repack_size_new =
|
||||
self.stats.total_size().repack + (pi.unused_size + pi.used_size) as u64;
|
||||
if repack_size_new >= max_repack
|
||||
|| (pi.blob_type != BlobType::Tree
|
||||
&& self.stats.size.unused_after_prune() < max_unused)
|
||||
&& self.stats.total_size().unused_after_prune() < max_unused)
|
||||
{
|
||||
pack.set_todo(PackToDo::Keep, &pi, &mut self.stats);
|
||||
} else {
|
||||
@ -577,7 +589,7 @@ impl Pruner {
|
||||
|
||||
// all remaining packs in existing_packs are unreferenced packs
|
||||
for size in self.existing_packs.values() {
|
||||
self.stats.size.unref += *size as u64;
|
||||
self.stats.size_unref += *size as u64;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@ -614,8 +626,8 @@ impl Pruner {
|
||||
|
||||
fn print_stats(&self) {
|
||||
let pack_stat = &self.stats.packs;
|
||||
let blob_stat = &self.stats.blobs;
|
||||
let size_stat = &self.stats.size;
|
||||
let blob_stat = self.stats.total_blobs();
|
||||
let size_stat = self.stats.total_size();
|
||||
|
||||
v2!(
|
||||
"used: {:>10} blobs, {:>10}",
|
||||
@ -657,14 +669,14 @@ impl Pruner {
|
||||
v1!(
|
||||
"unindexed: {:>10} packs, ?? blobs, {:>10}",
|
||||
self.existing_packs.len(),
|
||||
bytes(size_stat.unref)
|
||||
bytes(self.stats.size_unref)
|
||||
);
|
||||
}
|
||||
|
||||
v1!(
|
||||
"total prune: {:>10} blobs, {:>10}",
|
||||
blob_stat.repackrm + blob_stat.remove,
|
||||
bytes(size_stat.repackrm + size_stat.remove + size_stat.unref)
|
||||
bytes(size_stat.repackrm + size_stat.remove + self.stats.size_unref)
|
||||
);
|
||||
v1!(
|
||||
"remaining: {:>10} blobs, {:>10}",
|
||||
@ -720,8 +732,38 @@ impl Pruner {
|
||||
be.set_zstd(zstd);
|
||||
|
||||
let indexer = Indexer::new_unindexed(be.clone()).into_shared();
|
||||
let mut tree_packer = Packer::new(be.clone(), BlobType::Tree, indexer.clone(), zstd)?;
|
||||
let mut data_packer = Packer::new(be.clone(), BlobType::Data, indexer.clone(), zstd)?;
|
||||
|
||||
// Calculate an approximation of sizes after pruning.
|
||||
// The size actually is:
|
||||
// total_size_of_all_blobs + total_size_of_pack_headers + #packs * pack_overhead
|
||||
// This is hard/impossible to compute because:
|
||||
// - the size of blobs can change during repacking if compression is changed
|
||||
// - the size of pack headers depends on wheter blobs are compressed or not
|
||||
// - we don't know the number of packs generated by repacking
|
||||
// So, we simply use the current size of the blobs and an estimation of the pack
|
||||
// header size.
|
||||
let tree_size_after_prune = self.stats.size[BlobType::Tree].total_after_prune()
|
||||
+ self.stats.blobs[BlobType::Tree].total_after_prune()
|
||||
* IndexPack::HEADER_LEN_COMPRESSED as u64;
|
||||
let data_size_after_prune = self.stats.size[BlobType::Data].total_after_prune()
|
||||
+ self.stats.blobs[BlobType::Data].total_after_prune()
|
||||
* IndexPack::HEADER_LEN_COMPRESSED as u64;
|
||||
|
||||
let mut tree_repacker = Repacker::new(
|
||||
be.clone(),
|
||||
BlobType::Tree,
|
||||
indexer.clone(),
|
||||
&config,
|
||||
tree_size_after_prune,
|
||||
)?;
|
||||
|
||||
let mut data_repacker = Repacker::new(
|
||||
be.clone(),
|
||||
BlobType::Data,
|
||||
indexer.clone(),
|
||||
&config,
|
||||
data_size_after_prune,
|
||||
)?;
|
||||
|
||||
// mark unreferenced packs for deletion
|
||||
if !self.existing_packs.is_empty() {
|
||||
@ -780,38 +822,15 @@ impl Pruner {
|
||||
// don't save duplicate blobs
|
||||
continue;
|
||||
}
|
||||
|
||||
let repacker = match blob.tpe {
|
||||
BlobType::Data => &mut data_repacker,
|
||||
BlobType::Tree => &mut tree_repacker,
|
||||
};
|
||||
if opts.fast_repack {
|
||||
let data = be
|
||||
.read_partial(
|
||||
FileType::Pack,
|
||||
&pack.id,
|
||||
blob.tpe.is_cacheable(),
|
||||
blob.offset,
|
||||
blob.length,
|
||||
)
|
||||
.await?;
|
||||
match blob.tpe {
|
||||
BlobType::Data => &mut data_packer,
|
||||
BlobType::Tree => &mut tree_packer,
|
||||
}
|
||||
.add_raw(&data, &blob.id, blob.uncompressed_length)
|
||||
.await?;
|
||||
repacker.add_fast(&pack.id, blob).await?;
|
||||
} else {
|
||||
let data = be
|
||||
.read_encrypted_partial(
|
||||
FileType::Pack,
|
||||
&pack.id,
|
||||
blob.tpe.is_cacheable(),
|
||||
blob.offset,
|
||||
blob.length,
|
||||
)
|
||||
.await?;
|
||||
match blob.tpe {
|
||||
BlobType::Data => &mut data_packer,
|
||||
BlobType::Tree => &mut tree_packer,
|
||||
}
|
||||
.add(&data, &blob.id)
|
||||
.await?;
|
||||
repacker.add(&pack.id, blob).await?;
|
||||
}
|
||||
}
|
||||
if opts.instant_delete {
|
||||
@ -850,8 +869,8 @@ impl Pruner {
|
||||
}
|
||||
indexes_remove.push(index.id);
|
||||
}
|
||||
tree_packer.finalize().await?;
|
||||
data_packer.finalize().await?;
|
||||
tree_repacker.finalize().await?;
|
||||
data_repacker.finalize().await?;
|
||||
indexer.write().await.finalize().await?;
|
||||
|
||||
if !data_packs_remove.is_empty() {
|
||||
|
||||
@ -8,7 +8,7 @@ use super::{bytes, progress_counter};
|
||||
use crate::backend::{DecryptReadBackend, ReadBackend, ALL_FILE_TYPES};
|
||||
use crate::blob::BlobType;
|
||||
use crate::index::IndexEntry;
|
||||
use crate::repo::IndexFile;
|
||||
use crate::repo::{IndexFile, IndexPack};
|
||||
|
||||
#[derive(Parser)]
|
||||
pub(super) struct Opts;
|
||||
@ -32,6 +32,10 @@ pub(super) async fn execute(
|
||||
count: u64,
|
||||
size: u64,
|
||||
data_size: u64,
|
||||
pack_count: u64,
|
||||
total_pack_size: u64,
|
||||
min_pack_size: u64,
|
||||
max_pack_size: u64,
|
||||
}
|
||||
|
||||
impl Info {
|
||||
@ -40,15 +44,34 @@ pub(super) async fn execute(
|
||||
self.size += *ie.length() as u64;
|
||||
self.data_size += ie.data_length() as u64;
|
||||
}
|
||||
|
||||
fn add_pack(&mut self, ip: &IndexPack) {
|
||||
self.pack_count += 1;
|
||||
let size = ip.pack_size() as u64;
|
||||
self.total_pack_size += size;
|
||||
self.min_pack_size = self.min_pack_size.min(size);
|
||||
self.max_pack_size = self.max_pack_size.max(size);
|
||||
}
|
||||
}
|
||||
|
||||
let mut tree = Info::default();
|
||||
let mut data = Info::default();
|
||||
let mut tree = Info {
|
||||
min_pack_size: u64::MAX,
|
||||
..Default::default()
|
||||
};
|
||||
let mut data = Info {
|
||||
min_pack_size: u64::MAX,
|
||||
..Default::default()
|
||||
};
|
||||
let mut tree_delete = Info::default();
|
||||
let mut data_delete = Info::default();
|
||||
|
||||
while let Some((_, index)) = stream.try_next().await? {
|
||||
for pack in &index.packs {
|
||||
match pack.blob_type() {
|
||||
BlobType::Tree => tree.add_pack(pack),
|
||||
BlobType::Data => data.add_pack(pack),
|
||||
}
|
||||
|
||||
for blob in &pack.blobs {
|
||||
let ie = IndexEntry::from_index_blob(blob, pack.id);
|
||||
match blob.tpe {
|
||||
@ -72,7 +95,7 @@ pub(super) async fn execute(
|
||||
|
||||
let mut table = Table::new();
|
||||
|
||||
table.add_row(row!["Tree",r->tree.count,r->bytes(tree.data_size), r->bytes(tree.size)]);
|
||||
table.add_row(row!["Tree",r->tree.count,r->bytes(tree.data_size), r->bytes(tree.size) ]);
|
||||
table.add_row(row!["Data",r->data.count,r->bytes(data.data_size),r->bytes(data.size)]);
|
||||
if tree_delete.count > 0 {
|
||||
table.add_row(row!["Tree to delete",r->tree_delete.count,r->bytes(tree_delete.data_size),r->bytes(tree_delete.size)]);
|
||||
@ -91,6 +114,18 @@ pub(super) async fn execute(
|
||||
println!();
|
||||
table.printstd();
|
||||
|
||||
let mut table = Table::new();
|
||||
table.add_row(
|
||||
row!["Tree packs", r->tree.pack_count, r->bytes(tree.min_pack_size), r->bytes(tree.max_pack_size)],
|
||||
);
|
||||
table.add_row(
|
||||
row!["Data packs", r->data.pack_count, r->bytes(data.min_pack_size), r->bytes(data.max_pack_size)],
|
||||
);
|
||||
table.set_titles(row![b->"Blob type", br->"Pack Count", br->"Minimum Size",br->"Maximum Size"]);
|
||||
table.set_format(*format::consts::FORMAT_NO_BORDER_LINE_SEPARATOR);
|
||||
println!();
|
||||
table.printstd();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@ -29,6 +29,8 @@ pub(crate) struct IndexCollector {
|
||||
packs: Vec<Id>,
|
||||
tree: Vec<SortedEntry>,
|
||||
data: SortedHashSetMap,
|
||||
total_tree_size: u64,
|
||||
total_data_size: u64,
|
||||
}
|
||||
|
||||
impl IndexCollector {
|
||||
@ -42,6 +44,8 @@ impl IndexCollector {
|
||||
packs: Vec::new(),
|
||||
tree: Vec::new(),
|
||||
data,
|
||||
total_tree_size: 0,
|
||||
total_data_size: 0,
|
||||
}
|
||||
}
|
||||
|
||||
@ -56,6 +60,8 @@ impl IndexCollector {
|
||||
packs: self.packs,
|
||||
tree: self.tree,
|
||||
data: self.data,
|
||||
total_tree_size: self.total_tree_size,
|
||||
total_data_size: self.total_data_size,
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -69,6 +75,12 @@ impl Extend<IndexPack> for IndexCollector {
|
||||
let idx = self.packs.len();
|
||||
self.packs.push(p.id);
|
||||
let len = p.blobs.len();
|
||||
let blob_type = p.blob_type();
|
||||
|
||||
match blob_type {
|
||||
BlobType::Tree => self.total_tree_size += p.pack_size() as u64,
|
||||
BlobType::Data => self.total_data_size += p.pack_size() as u64,
|
||||
}
|
||||
|
||||
match (p.blob_type(), &mut self.data) {
|
||||
(BlobType::Tree, _) => self.tree.reserve(len),
|
||||
@ -100,6 +112,8 @@ pub struct Index {
|
||||
packs: Vec<Id>,
|
||||
tree: Vec<SortedEntry>,
|
||||
data: SortedHashSetMap,
|
||||
total_tree_size: u64,
|
||||
total_data_size: u64,
|
||||
}
|
||||
|
||||
impl ReadIndex for Index {
|
||||
@ -123,6 +137,13 @@ impl ReadIndex for Index {
|
||||
})
|
||||
}
|
||||
|
||||
fn total_size(&self, tpe: &BlobType) -> u64 {
|
||||
match tpe {
|
||||
BlobType::Tree => self.total_tree_size,
|
||||
BlobType::Data => self.total_data_size,
|
||||
}
|
||||
}
|
||||
|
||||
fn has(&self, tpe: &BlobType, id: &Id) -> bool {
|
||||
match (tpe, &self.data) {
|
||||
(BlobType::Tree, _) => self.tree.binary_search_by_key(id, |e| e.id).is_ok(),
|
||||
|
||||
@ -70,6 +70,7 @@ impl IndexEntry {
|
||||
#[delegatable_trait]
|
||||
pub trait ReadIndex {
|
||||
fn get_id(&self, tpe: &BlobType, id: &Id) -> Option<IndexEntry>;
|
||||
fn total_size(&self, tpe: &BlobType) -> u64;
|
||||
|
||||
fn get_tree(&self, id: &Id) -> Option<IndexEntry> {
|
||||
self.get_id(&BlobType::Tree, id)
|
||||
|
||||
@ -2,6 +2,7 @@ use anyhow::{bail, Result};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::backend::{FileType, RepoFile};
|
||||
use crate::blob::BlobType;
|
||||
use crate::id::Id;
|
||||
|
||||
#[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)]
|
||||
@ -13,12 +14,29 @@ pub struct ConfigFile {
|
||||
pub is_hot: Option<bool>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub compression: Option<i32>, // note that Some(0) means no compression.
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub treepack_size: Option<u32>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub treepack_growfactor: Option<u32>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub datapack_size: Option<u32>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub datapack_growfactor: Option<u32>,
|
||||
}
|
||||
|
||||
impl RepoFile for ConfigFile {
|
||||
const TYPE: FileType = FileType::Config;
|
||||
}
|
||||
|
||||
const KB: u32 = 1024;
|
||||
const MB: u32 = 1024 * KB;
|
||||
// default pack size
|
||||
const DEFAULT_TREE_SIZE: u32 = 4 * MB;
|
||||
const DEFAULT_DATA_SIZE: u32 = 32 * MB;
|
||||
// the default factor used for repo-size dependent pack size.
|
||||
// 32 * sqrt(reposize in bytes) = 1 MB * sqrt(reposize in GB)
|
||||
const DEFAULT_GROW_FACTOR: u32 = 32;
|
||||
|
||||
impl ConfigFile {
|
||||
pub fn new(version: u32, id: Id, poly: u64) -> Self {
|
||||
Self {
|
||||
@ -27,6 +45,10 @@ impl ConfigFile {
|
||||
chunker_polynomial: format!("{:x}", poly),
|
||||
is_hot: None,
|
||||
compression: None,
|
||||
treepack_size: None,
|
||||
treepack_growfactor: None,
|
||||
datapack_size: None,
|
||||
datapack_growfactor: None,
|
||||
}
|
||||
}
|
||||
|
||||
@ -42,4 +64,17 @@ impl ConfigFile {
|
||||
_ => bail!("config version not supported!"),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn packsize(&self, blob: BlobType) -> (u32, u32) {
|
||||
match blob {
|
||||
BlobType::Tree => (
|
||||
self.treepack_size.unwrap_or(DEFAULT_TREE_SIZE),
|
||||
self.treepack_growfactor.unwrap_or(DEFAULT_GROW_FACTOR),
|
||||
),
|
||||
BlobType::Data => (
|
||||
self.datapack_size.unwrap_or(DEFAULT_DATA_SIZE),
|
||||
self.datapack_growfactor.unwrap_or(DEFAULT_GROW_FACTOR),
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -42,6 +42,15 @@ pub struct IndexPack {
|
||||
}
|
||||
|
||||
impl IndexPack {
|
||||
// 4 equals the size of blob::packer::PackHeaderLength
|
||||
// 32 equals the size of the crypto overhead
|
||||
pub const PACK_OVERHEAD: u32 = 4 + 32;
|
||||
|
||||
// this equals the size of blob::packer::PackHeaderEntry
|
||||
pub const HEADER_LEN: u32 = 37;
|
||||
// this equals the size of blob::packer::PackHeaderEntryComp
|
||||
pub const HEADER_LEN_COMPRESSED: u32 = 41;
|
||||
|
||||
pub fn set_id(&mut self, id: Id) {
|
||||
self.id = id;
|
||||
}
|
||||
@ -66,16 +75,13 @@ impl IndexPack {
|
||||
// calculate the pack size from the contained blobs
|
||||
pub fn pack_size(&self) -> u32 {
|
||||
self.size.unwrap_or_else(|| {
|
||||
self.blobs.iter().fold(
|
||||
4 + 32, // 4 + crypto overhead
|
||||
|acc, blob| {
|
||||
acc + blob.length
|
||||
+ match blob.uncompressed_length {
|
||||
None => 37, // 37 = length of blob description for uncompressed blobs
|
||||
Some(_) => 41, // 41 = length of blob description for compressed blobs
|
||||
}
|
||||
},
|
||||
)
|
||||
self.blobs.iter().fold(Self::PACK_OVERHEAD, |acc, blob| {
|
||||
acc + blob.length
|
||||
+ match blob.uncompressed_length {
|
||||
None => Self::HEADER_LEN,
|
||||
Some(_) => Self::HEADER_LEN_COMPRESSED,
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user