implement lock-free pruning

This commit is contained in:
Alexander Weiss 2022-04-24 10:51:55 +02:00
parent efb4e9b660
commit 08bbc8688b
5 changed files with 289 additions and 99 deletions

View File

@ -9,7 +9,7 @@ use super::progress_counter;
use crate::backend::{DecryptReadBackend, FileType};
use crate::blob::{NodeType, TreeStreamer};
use crate::index::{IndexBackend, IndexedBackend};
use crate::repo::{IndexFile, SnapshotFile};
use crate::repo::{IndexFile, IndexPack, SnapshotFile};
#[derive(Parser)]
pub(super) struct Opts {
@ -38,25 +38,33 @@ pub(super) async fn execute(be: &(impl DecryptReadBackend + Unpin), opts: Opts)
async fn check_packs(be: &impl DecryptReadBackend) -> Result<()> {
let mut packs = HashMap::new();
let mut process_pack = |p: IndexPack| {
packs.insert(p.id, p.pack_size());
// check offsests in index
let mut expected_offset: u32 = 0;
let mut blobs = p.blobs;
blobs.sort_unstable();
for blob in blobs {
if blob.offset != expected_offset {
eprintln!(
"pack {}: blob {} offset in index: {}, expected: {}",
p.id, blob.id, blob.offset, expected_offset
);
}
expected_offset += blob.length;
}
};
// TODO: only read index files once
let mut stream = be.stream_all::<IndexFile>(progress_counter()).await?;
while let Some(index) = stream.next().await {
for p in index?.1.packs {
packs.insert(p.id, p.pack_size());
// check offsests in index
let mut expected_offset: u32 = 0;
let mut blobs = p.blobs;
blobs.sort_unstable();
for blob in blobs {
if blob.offset != expected_offset {
eprintln!(
"pack {}: blob {} offset in index: {}, expected: {}",
p.id, blob.id, blob.offset, expected_offset
);
}
expected_offset += blob.length;
}
let index = index?.1;
for p in index.packs {
process_pack(p);
}
for p in index.packs_to_delete {
process_pack(p);
}
}

View File

@ -6,7 +6,7 @@ use std::str::FromStr;
use anyhow::{anyhow, bail, Result};
use bytesize::ByteSize;
use chrono::{DateTime, Local};
use chrono::{DateTime, Duration, Local};
use clap::Parser;
use futures::{StreamExt, TryStreamExt};
use vlog::*;
@ -65,7 +65,8 @@ pub(super) async fn execute(be: &(impl DecryptFullBackend + Unpin), opts: Opts)
let mut pruner = Pruner::new(used_ids, existing_packs, index_files);
pruner.count_used_blobs();
pruner.check()?;
pruner.decide_packs()?;
// TODO: Make this customizable
pruner.decide_packs(Duration::hours(0), Duration::hours(23))?;
pruner.decide_repack(&opts.max_repack, &opts.max_unused);
pruner.filter_index_files();
pruner.print_stats();
@ -97,11 +98,23 @@ impl FromStr for LimitOption {
}
}
#[derive(Default)]
struct DeleteStats {
remove: u64,
recover: u64,
keep: u64,
}
impl DeleteStats {
fn total(&self) -> u64 {
self.remove + self.recover + self.keep
}
}
#[derive(Default)]
struct PackStats {
used: u64,
partly_used: u64,
unused: u64, // this equal the packs-to-remove
unused: u64, // this equals to packs-to-remove
repack: u64,
keep: u64,
}
@ -130,6 +143,7 @@ impl SizeStats {
#[derive(Default)]
struct PruneStats {
packs_to_delete: DeleteStats,
packs: PackStats,
blobs: SizeStats,
size: SizeStats,
@ -141,7 +155,7 @@ struct PruneIndex {
id: Id,
modified: bool,
packs: Vec<PrunePack>,
packs_to_delete: Vec<IndexPack>,
packs_to_delete: Vec<PrunePack>,
}
impl PruneIndex {
@ -155,6 +169,7 @@ enum PackToDo {
Keep,
Repack,
Remove,
Recover,
}
#[derive(Debug)]
@ -166,8 +181,8 @@ struct PrunePack {
blobs: Vec<IndexBlob>,
}
#[derive(Default)]
struct Pruner {
time: DateTime<Local>,
used_ids: HashMap<Id, u8>,
existing_packs: HashMap<Id, u32>,
repack_candidates: Vec<RepackCandidate>,
@ -182,6 +197,7 @@ impl Pruner {
index_files: Vec<(Id, IndexFile)>,
) -> Self {
let mut processed_packs = HashSet::new();
let mut processed_packs_delete = HashSet::new();
let index_files = index_files
.into_iter()
.map(|(id, index)| {
@ -203,7 +219,24 @@ impl Pruner {
blobs: p.blobs,
})
.collect();
let packs_to_delete = index.packs_to_delete;
let packs_to_delete = index
.packs_to_delete
.into_iter()
// filter out duplicate packs
.filter(|p| {
let no_duplicate = processed_packs_delete.insert(p.id);
modified |= !no_duplicate;
no_duplicate
})
.map(|p| PrunePack {
id: p.id,
blob_type: p.blob_type(),
to_do: PackToDo::Keep,
time: p.time,
blobs: p.blobs,
})
.collect();
PruneIndex {
id,
modified,
@ -214,10 +247,12 @@ impl Pruner {
.collect();
Self {
time: Local::now(),
used_ids,
existing_packs,
repack_candidates: Vec::new(),
index_files,
..Default::default()
stats: PruneStats::default(),
}
}
@ -248,62 +283,104 @@ impl Pruner {
Ok(())
}
fn decide_packs(&mut self) -> Result<()> {
for pack in self
.index_files
.iter_mut()
.flat_map(|index| index.packs.iter_mut())
{
let mut pi = PackInfo::new(pack.blob_type);
fn decide_packs(&mut self, keep_pack: Duration, keep_delete: Duration) -> Result<()> {
for index in self.index_files.iter_mut() {
// decide what to do for "normal" packs
for pack in index.packs.iter_mut() {
let mut pi = PackInfo::new(pack.blob_type);
// check if the pack has used blobs which are no duplicates
let has_used = pack
.blobs
.iter()
.any(|blob| self.used_ids.get(&blob.id) == Some(&1));
// check if the pack has used blobs which are no duplicates
let has_used = pack
.blobs
.iter()
.any(|blob| self.used_ids.get(&blob.id) == Some(&1));
for blob in &pack.blobs {
match self.used_ids.get_mut(&blob.id) {
None => pi.add_unused_blob(blob),
Some(count) => pi.add_blob(blob, has_used, count),
for blob in &pack.blobs {
match self.used_ids.get_mut(&blob.id) {
None => pi.add_unused_blob(blob),
Some(count) => pi.add_blob(blob, has_used, count),
}
}
self.stats.blobs.used += pi.used_blobs as u64;
self.stats.blobs.unused += pi.unused_blobs as u64;
self.stats.size.used += pi.used_size as u64;
self.stats.size.unused += pi.unused_size as u64;
if pi.used_blobs == 0 {
// unused pack
self.existing_packs.remove(&pack.id);
self.stats.packs.unused += 1;
if pack.time >= Some(self.time - keep_pack) {
// keep packs which are too young
self.stats.packs.keep += 1;
for blob in &pack.blobs {
self.used_ids.remove(&blob.id);
}
} else {
pack.to_do = PackToDo::Remove;
self.stats.blobs.remove += pi.unused_blobs as u64;
self.stats.size.remove += pi.unused_size as u64;
}
} else {
if self.existing_packs.remove(&pack.id).is_none() {
bail!("used pack {} does not exist!", pack.id);
}
if pi.unused_blobs == 0 {
// used pack
self.stats.packs.used += 1;
self.stats.packs.keep += 1;
for blob in &pack.blobs {
self.used_ids.remove(&blob.id);
}
} else {
if pack.time > Some(self.time - keep_pack) {
// keep packs which are too young
self.stats.packs.keep += 1;
for blob in &pack.blobs {
self.used_ids.remove(&blob.id);
}
} else {
// partly used pack => candidate for repacking
self.stats.packs.partly_used += 1;
self.repack_candidates
.push(RepackCandidate { id: pack.id, pi })
}
}
}
}
self.stats.blobs.used += pi.used_blobs as u64;
self.stats.blobs.unused += pi.unused_blobs as u64;
self.stats.size.used += pi.used_size as u64;
self.stats.size.unused += pi.unused_size as u64;
if pi.used_blobs == 0 {
// unused pack
self.stats.packs.unused += 1;
pack.to_do = PackToDo::Remove;
self.stats.blobs.remove += pi.unused_blobs as u64;
self.stats.size.remove += pi.unused_size as u64;
// decide what to do for packs in packs_to_delete
for pack in index.packs_to_delete.iter_mut() {
// check if the pack has used blobs which are not refereced in the index of
// packs which are not marked for deletion.
if pack.blobs.iter().fold(false, |acc, blob| {
if let Some(count) = self.used_ids.get_mut(&blob.id) {
if *count == 0 {
*count = 1;
return true;
}
}
return acc;
}) {
// if so, mark this pack for recovery
pack.to_do = PackToDo::Recover;
self.stats.packs_to_delete.recover += 1;
} else if self.time - pack.time.expect("packs_to_delete has no time") >= keep_delete
{
pack.to_do = PackToDo::Remove;
self.stats.packs_to_delete.remove += 1;
} else {
self.stats.packs_to_delete.keep += 1;
}
self.existing_packs.remove(&pack.id);
} else {
if self.existing_packs.remove(&pack.id).is_none() {
bail!("used pack {} does not exist!", pack.id);
}
if pi.unused_blobs == 0 {
// used pack
self.stats.packs.used += 1;
self.stats.packs.keep += 1;
for blob in &pack.blobs {
self.used_ids.remove(&blob.id);
}
} else {
// partly used pack => candidate for repacking
self.stats.packs.partly_used += 1;
self.repack_candidates
.push(RepackCandidate { id: pack.id, pi })
}
}
}
// all remaining packs in existing_packs are not needed unindexed packs
// all remaining packs in existing_packs are unindexed packs
for size in self.existing_packs.values() {
self.stats.size.unref += *size as u64;
}
@ -365,11 +442,14 @@ impl Pruner {
.into_iter()
.filter(|index| {
// index must be processed if it has been modified
// or if any pack is not kept
let must_modify = index.modified
|| index.packs.iter().any(|p| {
// or if packs needs to be removed or repacked.
p.to_do == PackToDo::Repack || p.to_do == PackToDo::Remove
});
|| index.packs.iter().any(|p| p.to_do != PackToDo::Keep)
|| index
.packs_to_delete
.iter()
.any(|p| p.to_do != PackToDo::Keep);
any_must_modify |= must_modify;
// also process index files which are too small (i.e. rebuild them)
@ -453,6 +533,23 @@ impl Pruner {
self.index_files.len(),
self.stats.index_files
);
v1!(
"packs marked for deletion: {:>10}",
self.stats.packs_to_delete.total(),
);
v1!(
" - complete deletion: {:>10}",
self.stats.packs_to_delete.remove,
);
v1!(
" - keep marked: {:>10}",
self.stats.packs_to_delete.keep,
);
v1!(
" - recover: {:>10}",
self.stats.packs_to_delete.recover,
);
}
async fn do_prune(mut self, be: &impl DecryptWriteBackend) -> Result<()> {
@ -484,9 +581,12 @@ impl Pruner {
for index in self.index_files {
for pack in index.packs {
match pack.to_do {
PackToDo::Recover => {
bail!("not supported!");
}
PackToDo::Repack => {
// TODO: repack in parallel
for blob in pack.blobs {
for blob in &pack.blobs {
if self.used_ids.remove(&blob.id).is_none() {
// don't save duplicate blobs
continue;
@ -496,7 +596,13 @@ impl Pruner {
.await?;
packer.add_raw(&data, &blob.id, blob.tpe).await?;
}
packs_remove.push(pack.id)
// mark pack for removal
let pack = IndexPack {
id: pack.id,
time: Some(self.time),
blobs: pack.blobs,
};
indexer.borrow_mut().add_remove(pack).await?;
}
PackToDo::Keep => {
// keep pack: add to new index
@ -507,7 +613,44 @@ impl Pruner {
};
indexer.borrow_mut().add(pack).await?;
}
PackToDo::Remove => packs_remove.push(pack.id),
PackToDo::Remove => {
// remove pack: add to new index in section packs_to_delete
let pack = IndexPack {
id: pack.id,
time: Some(self.time),
blobs: pack.blobs,
};
indexer.borrow_mut().add_remove(pack).await?;
}
}
}
for pack in index.packs_to_delete {
match pack.to_do {
PackToDo::Repack => {
bail!("not supported");
}
PackToDo::Keep => {
// keep pack: add to new index
let pack = IndexPack {
id: pack.id,
time: pack.time,
blobs: pack.blobs,
};
indexer.borrow_mut().add_remove(pack).await?;
}
PackToDo::Recover => {
// recover pack: add to new index in section packs
let pack = IndexPack {
id: pack.id,
time: Some(self.time),
blobs: pack.blobs,
};
indexer.borrow_mut().add(pack).await?;
}
PackToDo::Remove => {
// delete pack
packs_remove.push(pack.id)
}
}
}
indexes_remove.push(index.id);

View File

@ -42,32 +42,59 @@ pub(super) async fn execute(be: &impl DecryptReadBackend, _opts: Opts) -> Result
v1!("scanning index...");
let p = progress_counter();
let mut stream = be.stream_all::<IndexFile>(p.clone()).await?;
let mut tree_count = 0;
let mut tree_size = 0;
let mut data_count = 0;
let mut data_size = 0;
#[derive(Default)]
struct Info {
count: u64,
size: u64,
}
impl Info {
fn add(&mut self, length: u32) {
self.count += 1;
self.size += length as u64;
}
}
let mut tree = Info::default();
let mut data = Info::default();
let mut tree_delete = Info::default();
let mut data_delete = Info::default();
while let Some(index) = stream.next().await {
for pack in index?.1.packs {
for blob in pack.blobs {
match blob.tpe {
BlobType::Tree => {
tree_count += 1;
tree_size += blob.length as u64;
}
BlobType::Data => {
data_count += 1;
data_size += blob.length as u64;
}
}
let index = index?.1;
for blob in index.packs.iter().flat_map(|pack| &pack.blobs) {
match blob.tpe {
BlobType::Tree => tree.add(blob.length),
BlobType::Data => data.add(blob.length),
}
}
for blob in index.packs_to_delete.iter().flat_map(|pack| &pack.blobs) {
match blob.tpe {
BlobType::Tree => tree_delete.add(blob.length),
BlobType::Data => data_delete.add(blob.length),
}
}
}
p.finish_with_message("done");
let mut table = Table::new();
table.add_row(row!["Tree",r->tree_count,r->ByteSize(tree_size).to_string_as(true)]);
table.add_row(row!["Data",r->data_count,r->ByteSize(data_size).to_string_as(true)]);
table.add_row(row!["Total",r->tree_count + data_count,r->ByteSize(tree_size+data_size).to_string_as(true)]);
table.add_row(row!["Tree",r->tree.count,r->ByteSize(tree.size).to_string_as(true)]);
table.add_row(row!["Data",r->data.count,r->ByteSize(data.size).to_string_as(true)]);
if tree_delete.count > 0 {
table.add_row(
row!["Tree to delete",r->tree_delete.count,r->ByteSize(tree_delete.size).to_string_as(true)],
);
}
if data_delete.count > 0 {
table.add_row(
row!["Data to delete",r->data_delete.count,r->ByteSize(data_delete.size).to_string_as(true)],
);
}
table.add_row(
row!["Total",r->tree.count + data.count+tree_delete.count + data_delete.count,
r->ByteSize(tree.size+data.size+tree_delete.size+data_delete.size).to_string_as(true)],
);
table.set_titles(row![b->"Blob type", br->"Count", br->"Total Size"]);
table.set_format(*format::consts::FORMAT_NO_BORDER_LINE_SEPARATOR);

View File

@ -61,6 +61,14 @@ impl<BE: DecryptWriteBackend> Indexer<BE> {
}
pub async fn add(&mut self, pack: IndexPack) -> Result<()> {
self.add_with(pack, false).await
}
pub async fn add_remove(&mut self, pack: IndexPack) -> Result<()> {
self.add_with(pack, true).await
}
pub async fn add_with(&mut self, pack: IndexPack, delete: bool) -> Result<()> {
self.count += pack.blobs.len();
if let Some(indexed) = &mut self.indexed {
@ -69,7 +77,7 @@ impl<BE: DecryptWriteBackend> Indexer<BE> {
}
}
self.file.add(pack);
self.file.add(pack, delete);
// check if IndexFile needs to be saved
if self.count >= MAX_COUNT || self.created.elapsed()? >= MAX_AGE {

View File

@ -21,8 +21,12 @@ impl RepoFile for IndexFile {
}
impl IndexFile {
pub fn add(&mut self, p: IndexPack) {
self.packs.push(p);
pub fn add(&mut self, p: IndexPack, delete: bool) {
if delete {
self.packs_to_delete.push(p);
} else {
self.packs.push(p);
}
}
}