diff --git a/src/commands/check.rs b/src/commands/check.rs index 10d506d..08a8522 100644 --- a/src/commands/check.rs +++ b/src/commands/check.rs @@ -9,7 +9,7 @@ use super::progress_counter; use crate::backend::{DecryptReadBackend, FileType}; use crate::blob::{NodeType, TreeStreamer}; use crate::index::{IndexBackend, IndexedBackend}; -use crate::repo::{IndexFile, SnapshotFile}; +use crate::repo::{IndexFile, IndexPack, SnapshotFile}; #[derive(Parser)] pub(super) struct Opts { @@ -38,25 +38,33 @@ pub(super) async fn execute(be: &(impl DecryptReadBackend + Unpin), opts: Opts) async fn check_packs(be: &impl DecryptReadBackend) -> Result<()> { let mut packs = HashMap::new(); + let mut process_pack = |p: IndexPack| { + packs.insert(p.id, p.pack_size()); + + // check offsests in index + let mut expected_offset: u32 = 0; + let mut blobs = p.blobs; + blobs.sort_unstable(); + for blob in blobs { + if blob.offset != expected_offset { + eprintln!( + "pack {}: blob {} offset in index: {}, expected: {}", + p.id, blob.id, blob.offset, expected_offset + ); + } + expected_offset += blob.length; + } + }; + // TODO: only read index files once let mut stream = be.stream_all::(progress_counter()).await?; while let Some(index) = stream.next().await { - for p in index?.1.packs { - packs.insert(p.id, p.pack_size()); - - // check offsests in index - let mut expected_offset: u32 = 0; - let mut blobs = p.blobs; - blobs.sort_unstable(); - for blob in blobs { - if blob.offset != expected_offset { - eprintln!( - "pack {}: blob {} offset in index: {}, expected: {}", - p.id, blob.id, blob.offset, expected_offset - ); - } - expected_offset += blob.length; - } + let index = index?.1; + for p in index.packs { + process_pack(p); + } + for p in index.packs_to_delete { + process_pack(p); } } diff --git a/src/commands/prune.rs b/src/commands/prune.rs index d849375..717f6ce 100644 --- a/src/commands/prune.rs +++ b/src/commands/prune.rs @@ -6,7 +6,7 @@ use std::str::FromStr; use anyhow::{anyhow, bail, Result}; use bytesize::ByteSize; -use chrono::{DateTime, Local}; +use chrono::{DateTime, Duration, Local}; use clap::Parser; use futures::{StreamExt, TryStreamExt}; use vlog::*; @@ -65,7 +65,8 @@ pub(super) async fn execute(be: &(impl DecryptFullBackend + Unpin), opts: Opts) let mut pruner = Pruner::new(used_ids, existing_packs, index_files); pruner.count_used_blobs(); pruner.check()?; - pruner.decide_packs()?; + // TODO: Make this customizable + pruner.decide_packs(Duration::hours(0), Duration::hours(23))?; pruner.decide_repack(&opts.max_repack, &opts.max_unused); pruner.filter_index_files(); pruner.print_stats(); @@ -97,11 +98,23 @@ impl FromStr for LimitOption { } } +#[derive(Default)] +struct DeleteStats { + remove: u64, + recover: u64, + keep: u64, +} + +impl DeleteStats { + fn total(&self) -> u64 { + self.remove + self.recover + self.keep + } +} #[derive(Default)] struct PackStats { used: u64, partly_used: u64, - unused: u64, // this equal the packs-to-remove + unused: u64, // this equals to packs-to-remove repack: u64, keep: u64, } @@ -130,6 +143,7 @@ impl SizeStats { #[derive(Default)] struct PruneStats { + packs_to_delete: DeleteStats, packs: PackStats, blobs: SizeStats, size: SizeStats, @@ -141,7 +155,7 @@ struct PruneIndex { id: Id, modified: bool, packs: Vec, - packs_to_delete: Vec, + packs_to_delete: Vec, } impl PruneIndex { @@ -155,6 +169,7 @@ enum PackToDo { Keep, Repack, Remove, + Recover, } #[derive(Debug)] @@ -166,8 +181,8 @@ struct PrunePack { blobs: Vec, } -#[derive(Default)] struct Pruner { + time: DateTime, used_ids: HashMap, existing_packs: HashMap, repack_candidates: Vec, @@ -182,6 +197,7 @@ impl Pruner { index_files: Vec<(Id, IndexFile)>, ) -> Self { let mut processed_packs = HashSet::new(); + let mut processed_packs_delete = HashSet::new(); let index_files = index_files .into_iter() .map(|(id, index)| { @@ -203,7 +219,24 @@ impl Pruner { blobs: p.blobs, }) .collect(); - let packs_to_delete = index.packs_to_delete; + let packs_to_delete = index + .packs_to_delete + .into_iter() + // filter out duplicate packs + .filter(|p| { + let no_duplicate = processed_packs_delete.insert(p.id); + modified |= !no_duplicate; + no_duplicate + }) + .map(|p| PrunePack { + id: p.id, + blob_type: p.blob_type(), + to_do: PackToDo::Keep, + time: p.time, + blobs: p.blobs, + }) + .collect(); + PruneIndex { id, modified, @@ -214,10 +247,12 @@ impl Pruner { .collect(); Self { + time: Local::now(), used_ids, existing_packs, + repack_candidates: Vec::new(), index_files, - ..Default::default() + stats: PruneStats::default(), } } @@ -248,62 +283,104 @@ impl Pruner { Ok(()) } - fn decide_packs(&mut self) -> Result<()> { - for pack in self - .index_files - .iter_mut() - .flat_map(|index| index.packs.iter_mut()) - { - let mut pi = PackInfo::new(pack.blob_type); + fn decide_packs(&mut self, keep_pack: Duration, keep_delete: Duration) -> Result<()> { + for index in self.index_files.iter_mut() { + // decide what to do for "normal" packs + for pack in index.packs.iter_mut() { + let mut pi = PackInfo::new(pack.blob_type); - // check if the pack has used blobs which are no duplicates - let has_used = pack - .blobs - .iter() - .any(|blob| self.used_ids.get(&blob.id) == Some(&1)); + // check if the pack has used blobs which are no duplicates + let has_used = pack + .blobs + .iter() + .any(|blob| self.used_ids.get(&blob.id) == Some(&1)); - for blob in &pack.blobs { - match self.used_ids.get_mut(&blob.id) { - None => pi.add_unused_blob(blob), - Some(count) => pi.add_blob(blob, has_used, count), + for blob in &pack.blobs { + match self.used_ids.get_mut(&blob.id) { + None => pi.add_unused_blob(blob), + Some(count) => pi.add_blob(blob, has_used, count), + } + } + + self.stats.blobs.used += pi.used_blobs as u64; + self.stats.blobs.unused += pi.unused_blobs as u64; + self.stats.size.used += pi.used_size as u64; + self.stats.size.unused += pi.unused_size as u64; + + if pi.used_blobs == 0 { + // unused pack + self.existing_packs.remove(&pack.id); + + self.stats.packs.unused += 1; + if pack.time >= Some(self.time - keep_pack) { + // keep packs which are too young + self.stats.packs.keep += 1; + for blob in &pack.blobs { + self.used_ids.remove(&blob.id); + } + } else { + pack.to_do = PackToDo::Remove; + self.stats.blobs.remove += pi.unused_blobs as u64; + self.stats.size.remove += pi.unused_size as u64; + } + } else { + if self.existing_packs.remove(&pack.id).is_none() { + bail!("used pack {} does not exist!", pack.id); + } + + if pi.unused_blobs == 0 { + // used pack + self.stats.packs.used += 1; + self.stats.packs.keep += 1; + for blob in &pack.blobs { + self.used_ids.remove(&blob.id); + } + } else { + if pack.time > Some(self.time - keep_pack) { + // keep packs which are too young + self.stats.packs.keep += 1; + for blob in &pack.blobs { + self.used_ids.remove(&blob.id); + } + } else { + // partly used pack => candidate for repacking + self.stats.packs.partly_used += 1; + self.repack_candidates + .push(RepackCandidate { id: pack.id, pi }) + } + } } } - self.stats.blobs.used += pi.used_blobs as u64; - self.stats.blobs.unused += pi.unused_blobs as u64; - self.stats.size.used += pi.used_size as u64; - self.stats.size.unused += pi.unused_size as u64; - - if pi.used_blobs == 0 { - // unused pack - self.stats.packs.unused += 1; - pack.to_do = PackToDo::Remove; - self.stats.blobs.remove += pi.unused_blobs as u64; - self.stats.size.remove += pi.unused_size as u64; + // decide what to do for packs in packs_to_delete + for pack in index.packs_to_delete.iter_mut() { + // check if the pack has used blobs which are not refereced in the index of + // packs which are not marked for deletion. + if pack.blobs.iter().fold(false, |acc, blob| { + if let Some(count) = self.used_ids.get_mut(&blob.id) { + if *count == 0 { + *count = 1; + return true; + } + } + return acc; + }) { + // if so, mark this pack for recovery + pack.to_do = PackToDo::Recover; + self.stats.packs_to_delete.recover += 1; + } else if self.time - pack.time.expect("packs_to_delete has no time") >= keep_delete + { + pack.to_do = PackToDo::Remove; + self.stats.packs_to_delete.remove += 1; + } else { + self.stats.packs_to_delete.keep += 1; + } self.existing_packs.remove(&pack.id); - } else { - if self.existing_packs.remove(&pack.id).is_none() { - bail!("used pack {} does not exist!", pack.id); - } - - if pi.unused_blobs == 0 { - // used pack - self.stats.packs.used += 1; - self.stats.packs.keep += 1; - for blob in &pack.blobs { - self.used_ids.remove(&blob.id); - } - } else { - // partly used pack => candidate for repacking - self.stats.packs.partly_used += 1; - self.repack_candidates - .push(RepackCandidate { id: pack.id, pi }) - } } } - // all remaining packs in existing_packs are not needed unindexed packs + // all remaining packs in existing_packs are unindexed packs for size in self.existing_packs.values() { self.stats.size.unref += *size as u64; } @@ -365,11 +442,14 @@ impl Pruner { .into_iter() .filter(|index| { // index must be processed if it has been modified + // or if any pack is not kept let must_modify = index.modified - || index.packs.iter().any(|p| { - // or if packs needs to be removed or repacked. - p.to_do == PackToDo::Repack || p.to_do == PackToDo::Remove - }); + || index.packs.iter().any(|p| p.to_do != PackToDo::Keep) + || index + .packs_to_delete + .iter() + .any(|p| p.to_do != PackToDo::Keep); + any_must_modify |= must_modify; // also process index files which are too small (i.e. rebuild them) @@ -453,6 +533,23 @@ impl Pruner { self.index_files.len(), self.stats.index_files ); + + v1!( + "packs marked for deletion: {:>10}", + self.stats.packs_to_delete.total(), + ); + v1!( + " - complete deletion: {:>10}", + self.stats.packs_to_delete.remove, + ); + v1!( + " - keep marked: {:>10}", + self.stats.packs_to_delete.keep, + ); + v1!( + " - recover: {:>10}", + self.stats.packs_to_delete.recover, + ); } async fn do_prune(mut self, be: &impl DecryptWriteBackend) -> Result<()> { @@ -484,9 +581,12 @@ impl Pruner { for index in self.index_files { for pack in index.packs { match pack.to_do { + PackToDo::Recover => { + bail!("not supported!"); + } PackToDo::Repack => { // TODO: repack in parallel - for blob in pack.blobs { + for blob in &pack.blobs { if self.used_ids.remove(&blob.id).is_none() { // don't save duplicate blobs continue; @@ -496,7 +596,13 @@ impl Pruner { .await?; packer.add_raw(&data, &blob.id, blob.tpe).await?; } - packs_remove.push(pack.id) + // mark pack for removal + let pack = IndexPack { + id: pack.id, + time: Some(self.time), + blobs: pack.blobs, + }; + indexer.borrow_mut().add_remove(pack).await?; } PackToDo::Keep => { // keep pack: add to new index @@ -507,7 +613,44 @@ impl Pruner { }; indexer.borrow_mut().add(pack).await?; } - PackToDo::Remove => packs_remove.push(pack.id), + PackToDo::Remove => { + // remove pack: add to new index in section packs_to_delete + let pack = IndexPack { + id: pack.id, + time: Some(self.time), + blobs: pack.blobs, + }; + indexer.borrow_mut().add_remove(pack).await?; + } + } + } + for pack in index.packs_to_delete { + match pack.to_do { + PackToDo::Repack => { + bail!("not supported"); + } + PackToDo::Keep => { + // keep pack: add to new index + let pack = IndexPack { + id: pack.id, + time: pack.time, + blobs: pack.blobs, + }; + indexer.borrow_mut().add_remove(pack).await?; + } + PackToDo::Recover => { + // recover pack: add to new index in section packs + let pack = IndexPack { + id: pack.id, + time: Some(self.time), + blobs: pack.blobs, + }; + indexer.borrow_mut().add(pack).await?; + } + PackToDo::Remove => { + // delete pack + packs_remove.push(pack.id) + } } } indexes_remove.push(index.id); diff --git a/src/commands/repoinfo.rs b/src/commands/repoinfo.rs index e143121..322a74e 100644 --- a/src/commands/repoinfo.rs +++ b/src/commands/repoinfo.rs @@ -42,32 +42,59 @@ pub(super) async fn execute(be: &impl DecryptReadBackend, _opts: Opts) -> Result v1!("scanning index..."); let p = progress_counter(); let mut stream = be.stream_all::(p.clone()).await?; - let mut tree_count = 0; - let mut tree_size = 0; - let mut data_count = 0; - let mut data_size = 0; + + #[derive(Default)] + struct Info { + count: u64, + size: u64, + } + + impl Info { + fn add(&mut self, length: u32) { + self.count += 1; + self.size += length as u64; + } + } + + let mut tree = Info::default(); + let mut data = Info::default(); + let mut tree_delete = Info::default(); + let mut data_delete = Info::default(); + while let Some(index) = stream.next().await { - for pack in index?.1.packs { - for blob in pack.blobs { - match blob.tpe { - BlobType::Tree => { - tree_count += 1; - tree_size += blob.length as u64; - } - BlobType::Data => { - data_count += 1; - data_size += blob.length as u64; - } - } + let index = index?.1; + for blob in index.packs.iter().flat_map(|pack| &pack.blobs) { + match blob.tpe { + BlobType::Tree => tree.add(blob.length), + BlobType::Data => data.add(blob.length), + } + } + for blob in index.packs_to_delete.iter().flat_map(|pack| &pack.blobs) { + match blob.tpe { + BlobType::Tree => tree_delete.add(blob.length), + BlobType::Data => data_delete.add(blob.length), } } } p.finish_with_message("done"); let mut table = Table::new(); - table.add_row(row!["Tree",r->tree_count,r->ByteSize(tree_size).to_string_as(true)]); - table.add_row(row!["Data",r->data_count,r->ByteSize(data_size).to_string_as(true)]); - table.add_row(row!["Total",r->tree_count + data_count,r->ByteSize(tree_size+data_size).to_string_as(true)]); + table.add_row(row!["Tree",r->tree.count,r->ByteSize(tree.size).to_string_as(true)]); + table.add_row(row!["Data",r->data.count,r->ByteSize(data.size).to_string_as(true)]); + if tree_delete.count > 0 { + table.add_row( + row!["Tree to delete",r->tree_delete.count,r->ByteSize(tree_delete.size).to_string_as(true)], + ); + } + if data_delete.count > 0 { + table.add_row( + row!["Data to delete",r->data_delete.count,r->ByteSize(data_delete.size).to_string_as(true)], + ); + } + table.add_row( + row!["Total",r->tree.count + data.count+tree_delete.count + data_delete.count, + r->ByteSize(tree.size+data.size+tree_delete.size+data_delete.size).to_string_as(true)], + ); table.set_titles(row![b->"Blob type", br->"Count", br->"Total Size"]); table.set_format(*format::consts::FORMAT_NO_BORDER_LINE_SEPARATOR); diff --git a/src/index/indexer.rs b/src/index/indexer.rs index c6d1790..dff4f79 100644 --- a/src/index/indexer.rs +++ b/src/index/indexer.rs @@ -61,6 +61,14 @@ impl Indexer { } pub async fn add(&mut self, pack: IndexPack) -> Result<()> { + self.add_with(pack, false).await + } + + pub async fn add_remove(&mut self, pack: IndexPack) -> Result<()> { + self.add_with(pack, true).await + } + + pub async fn add_with(&mut self, pack: IndexPack, delete: bool) -> Result<()> { self.count += pack.blobs.len(); if let Some(indexed) = &mut self.indexed { @@ -69,7 +77,7 @@ impl Indexer { } } - self.file.add(pack); + self.file.add(pack, delete); // check if IndexFile needs to be saved if self.count >= MAX_COUNT || self.created.elapsed()? >= MAX_AGE { diff --git a/src/repo/indexfile.rs b/src/repo/indexfile.rs index bf8a673..fb493b2 100644 --- a/src/repo/indexfile.rs +++ b/src/repo/indexfile.rs @@ -21,8 +21,12 @@ impl RepoFile for IndexFile { } impl IndexFile { - pub fn add(&mut self, p: IndexPack) { - self.packs.push(p); + pub fn add(&mut self, p: IndexPack, delete: bool) { + if delete { + self.packs_to_delete.push(p); + } else { + self.packs.push(p); + } } }