urbackup_backend/clouddrive/TransactionalKvStore.h

810 lines
22 KiB
C++

#pragma once
#include <list>
#include <atomic>
#include "../common/lrucache.h"
#include "../Interface/Thread.h"
#include "../Interface/Types.h"
#include "../Interface/Mutex.h"
#include "../Interface/BackupFileSystem.h"
#include <set>
#include "../Interface/Condition.h"
#include <memory>
#include <deque>
#include <queue>
#include <mutex>
#include <thread>
#include <condition_variable>
#include "IOnlineKvStore.h"
#include "../urbackupcommon/os_functions.h"
#ifdef HAS_ASYNC
#include "fuse_io_context.h"
#endif
#include "../common/relaxed_atomic.h"
class IFsFile;
class ICompressEncryptFactory;
#ifndef NDEBUG
#define DIRTY_ITEM_CHECK
#endif
#ifdef DIRTY_ITEM_CHECK
#define DIRTY_ITEM(x) x
#else
#define DIRTY_ITEM(x)
#endif
#ifndef HAS_ASYNC
class fuse_io_context
{
public:
static bool is_sync_thread() {
return true;
}
};
#endif
namespace
{
class SubmitWorker;
class ReadOnlyFileWrapper;
class Bitmap;
enum ESubmissionAction
{
SubmissionAction_Working_Evict,
SubmissionAction_Working_Dirty,
SubmissionAction_Working_Delete,
SubmissionAction_Working_Compress,
SubmissionAction_Evict,
SubmissionAction_Dirty,
SubmissionAction_Delete,
SubmissionAction_Compress
};
struct SSubmissionItem
{
SSubmissionItem()
: finish(true)
{}
std::string key;
std::vector<std::string> keys;
ESubmissionAction action;
int64 transid;
int64 size;
bool compressed;
bool finish;
};
struct SFdKey
{
SFdKey(IFsFile* fd, int64 size, bool read_only)
: fd(fd), size(size), read_only(read_only), refcount(1)
{
}
SFdKey()
: fd(NULL), size(0), read_only(true), refcount(1)
{
}
IFsFile* fd;
int64 size;
bool read_only;
int refcount;
bool operator<(const SFdKey& other) const
{
if(other.fd==fd)
{
return read_only<other.read_only;
}
else
{
return fd < other.fd;
}
}
bool operator==(const SFdKey& other) const
{
return other.fd==fd && other.read_only==read_only;
}
};
struct SubmittedItem
{
std::string key;
int64 submittime;
};
struct SMemFile
{
SMemFile(IFsFile* file, const std::string& key, int64 size)
: file(file), size(size), compsize(-1), evicted(false), cow(false), key(key) {}
SMemFile()
: file(nullptr), size(0), compsize(-1), evicted(false), cow(false) {}
std::shared_ptr<IFsFile> file;
std::shared_ptr<IFsFile> old_file;
int64 size;
int64 compsize;
bool evicted;
bool cow;
std::string key;
};
}
#ifndef NDEBUG
class test_mutex : public std::mutex
{
public:
test_mutex() noexcept
: std::mutex() {}
test_mutex(const test_mutex&) = delete;
test_mutex& operator=(const test_mutex&) = delete;
void lock() {
std::mutex::lock();
if (lid != std::thread::id())
abort();
lid = std::this_thread::get_id();
}
void unlock() {
if (lid != std::this_thread::get_id())
abort();
lid = std::thread::id();
std::mutex::unlock();
}
bool has_lock() {
return lid == std::this_thread::get_id();
}
private:
std::thread::id lid = std::thread::id();
};
using cache_mutex_t = test_mutex;
#else
using cache_mutex_t = std::recursive_mutex;
#endif
class TransactionalKvStore : public IThread
{
class SubmitWorker;
class RetrievalOperation;
class RetrievalOperationNoLock;
class RetrievalOperationUnlockOnly;
public:
struct SCacheVal
{
SCacheVal()
: dirty(false), chances(0)
{}
bool dirty : 1;
unsigned char chances : 7;
};
class INumSecondChancesCallback
{
public:
virtual unsigned int get_num_second_chances(const std::string& key) = 0;
virtual bool is_metadata(const std::string& key) = 0;
};
TransactionalKvStore(IBackupFileSystem* cachefs, int64 min_cachesize, int64 min_free_size, int64 critical_free_size,
int64 throttle_free_size, int64 min_metadata_cache_free, float comp_percent, int64 comp_start_limit,
IOnlineKvStore* online_kv_store, const std::string& encryption_key, ICompressEncryptFactory* compress_encrypt_factory, bool verify_cache,
float cpu_multiplier, size_t no_compress_mult, bool with_prev_link, bool allow_evict, bool with_submitted_files,
float resubmit_compressed_ratio, int64 max_memfile_size, std::string memcache_path,
float memory_usage_factor, bool only_memfiles, unsigned int background_comp_method,
unsigned int cache_comp, unsigned int meta_cache_comp);
~TransactionalKvStore();
enum class BitmapInfo
{
Present,
NotPresent,
Unknown
};
class Flag
{
public:
static constexpr unsigned int disable_fd_cache = 1;
static constexpr unsigned int disable_throttling = 2;
static constexpr unsigned int prioritize_read = 4;
static constexpr unsigned int read_random = 8;
static constexpr unsigned int read_only = 16;
static constexpr unsigned int preload_once = 32;
static constexpr unsigned int disable_memfiles = 64;
};
IFsFile* get(const std::string& key,
BitmapInfo bitmap_present, unsigned int flags, int64 size_hint,
int preload_tag=0);
void release(const std::string& key);
#ifdef HAS_ASYNC
fuse_io_context::io_uring_task<void> release_async(fuse_io_context& io, const std::string& key);
#endif
bool del(const std::string& key);
#ifdef HAS_ASYNC
fuse_io_context::io_uring_task<bool> del_async(fuse_io_context& io, const std::string key);
#endif
bool set_second_chances(const std::string& key, unsigned int chances);
bool has_preload_once(const std::string& key);
bool has_item_cached(const std::string& key);
void remove_preload_items(int preload_tag);
void dirty_all();
bool checkpoint(bool do_submit, size_t checkpoint_retry_n);
void operator()();
void stop();
int64 get_dirty_bytes();
int64 get_submitted_bytes();
int64 get_total_submitted_bytes();
std::map<int64, size_t> get_num_dirty_items();
std::map<int64, size_t> get_num_memfile_items();
int64 get_cache_size();
int64 get_comp_bytes();
void reset();
bool is_congested();
bool is_congested_nolock();
bool is_congested_async();
int64 get_memfile_bytes();
int64 get_submitted_memfile_bytes();
bool is_memfile_complete(int64 ttransid);
std::string meminfo();
void shrink_mem();
int64 get_total_hits();
int64 get_total_hits_async();
int64 get_total_memory_hits();
int64 get_total_memory_hits_async();
int64 get_total_cache_miss_backend();
int64 get_total_cache_miss_decompress();
int64 get_total_dirty_ops();
int64 get_total_put_ops();
int64 get_total_compress_ops();
void disable_compression(int64 disablems);
void set_num_second_chances_callback(INumSecondChancesCallback* cb);
int64 get_transid();
int64 get_basetransid();
void set_max_cachesize(int64 ns);
int64 cache_total_space();
void set_disable_read_memfiles(bool b);
void set_disable_write_memfiles(bool b);
#ifdef HAS_ASYNC
fuse_io_context::io_uring_task<IFsFile*> get_async(fuse_io_context& io, const std::string& key,
BitmapInfo bitmap_present, unsigned int flags, int64 size_hint,
int preload_tag=0);
#endif
void check_mutex_not_held();
private:
IFsFile* get_internal(const std::string& key,
BitmapInfo bitmap_present, unsigned int flags, int64 size_hint,
int preload_tag);
IFsFile* get_retrieve(const std::string& key, BitmapInfo bitmap_present, unsigned int flags,
int64 size_hint, SFdKey* res, bool& cache_fd, std::unique_lock<cache_mutex_t>& cache_lock);
int64 set_active_transactions(std::unique_lock<cache_mutex_t>& cache_lock, bool continue_incomplete);
void cleanup(bool init);
void read_keys(std::unique_lock<cache_mutex_t>& cache_lock, const std::string& tpath, bool verify);
void read_missing();
void remove_transaction(int64 transid);
std::string keypath2(const std::string& key, int64 transaction_id);
std::string transpath2();
std::string basepath2();
std::string hex(const std::string& key);
std::string hexpath(const std::string& key);
bool evict_one(std::unique_lock<cache_mutex_t>& cache_lock, bool break_on_skip, bool only_non_dirty,
std::list<std::pair<std::string const *, SCacheVal> >::iterator& evict_it,
common::lrucache<std::string, SCacheVal>& target_cache, bool use_chances,
int64& freed_space,
bool& run_del_items, std::vector<std::list<std::pair<std::string const *, SCacheVal> >::iterator>& move_front,
bool& used_chance);
void evict_move_front(common::lrucache<std::string, SCacheVal>& target_cache,
std::vector<std::list<std::pair<std::string const *, SCacheVal> >::iterator>& move_front);
void evict_item(const std::string& key, bool dirty,
common::lrucache<std::string, SCacheVal>& target_cache,
std::list<std::pair<std::string const *, SCacheVal> >::iterator* evict_it,
std::unique_lock<cache_mutex_t>& cache_lock, const std::string& from, int64& freed_space);
bool evict_memfiles(std::unique_lock<cache_mutex_t>& cache_lock, bool evict_dirty);
void check_deleted(int64 transid, const std::string& key, bool comp);
bool compress_one(std::unique_lock<cache_mutex_t>& cache_lock,
std::list<std::pair<std::string const *, SCacheVal> >::iterator& compress_it);
std::list<SSubmissionItem>::iterator next_submission_item(bool no_compress, bool prefer_non_delete, bool prefer_mem, std::string& path, bool& p_do_stop, SMemFile*& memf);
std::list<SSubmissionItem>::iterator no_submission_item();
bool item_submitted(std::list<SSubmissionItem>::iterator it, bool can_delete, bool is_memf);
bool item_compressed(std::list<SSubmissionItem>::iterator it, bool compression_error, int64 size_diff, int64 add_comp_bytes, bool is_memf);
enum class DeleteImm
{
None,
NoUnlock,
Unlock
};
void delete_item(fuse_io_context* io, const std::string& key, bool compressed_item,
std::unique_lock<cache_mutex_t>& cache_lock,
int64 force_delete_transid=0, int64 skip_transid=0, DeleteImm del_imm = DeleteImm::None,
int64 delete_only=0, bool rm_submitted=false, int64 ignore_sync_wait_transid=0);
void run_del_file_queue();
#ifdef HAS_ASYNC
fuse_io_context::io_uring_task<void> run_del_file_queue_async(fuse_io_context& io);
#endif
void wait_for_del_file(const std::string& fn);
bool read_dirty_items(std::unique_lock<cache_mutex_t>& cache_lock, int64 transaction_id, int64 attibute_to_trans_id);
bool read_submitted_evicted_files(const std::string& fn, std::set<std::string>& sub_evict);
bool read_from_dirty_file(std::unique_lock<cache_mutex_t>& cache_lock, const std::string& fn, int64 transaction_id, bool do_submit, int64 nosubmit_transid);
bool write_to_dirty_file(std::unique_lock<cache_mutex_t>& cache_lock, const std::string& fn, bool do_submit, int64 new_trans,
size_t& num_memf_dirty, std::map<std::string, size_t>& memf_dirty_items,
std::map<std::string, int64>& memf_dirty_items_size);
bool read_from_deleted_file(const std::string& fn, int64 transaction_id, bool do_submit);
bool write_to_deleted_file(const std::string& fn, bool do_submit);
void submit_dummy(int64 transaction_id);
void clean_snapshot( int64 new_trans );
void wait_for_all_retrievals(std::unique_lock<cache_mutex_t>& lock);
bool wait_for_retrieval(std::unique_lock<cache_mutex_t>& lock, const std::string& key);
bool wait_for_retrieval_poll(std::unique_lock<cache_mutex_t>& lock, const std::string& key);
struct RetrievalRes
{
RetrievalRes()
: waited(false)
{}
std::unique_lock<cache_mutex_t> lock;
bool waited;
};
#ifdef HAS_ASYNC
fuse_io_context::io_uring_task<RetrievalRes> wait_for_retrieval_async(fuse_io_context& io, std::unique_lock<cache_mutex_t> lock, const std::string key);
#endif
void initiate_retrieval(const std::string& key);
void finish_retrieval(const std::string& key);
#ifdef HAS_ASYNC
void finish_retrieval_async(fuse_io_context& io, std::unique_lock<cache_mutex_t> lock, const std::string& key);
#endif
bool compress_item(const std::string& key, int64 transaction_id, IFsFile* src, int64& size_diff, int64& dst_size, bool sync);
bool decompress_item(const std::string& key, int64 transaction_id, IFsFile* tmpl_file, int64& size_diff, int64& src_size, bool sync);
void remove_compression_evicition_submissions(std::unique_lock<cache_mutex_t>& cache_lock);
void remove_curr_trans_submission(std::unique_lock<cache_mutex_t>& cache_lock, std::list<SSubmissionItem>::iterator it);
void wait_for_compressions_evictions(std::unique_lock<cache_mutex_t>& lock);
void addDirtyItem(int64 transid, std::string key, bool with_stats=true);
void removeDirtyItem(int64 transid, std::string key);
void add_dirty_bytes(int64 transid, std::string key, int64 b);
void rm_dirty_bytes(int64 transid, std::string key, int64 b, bool rm, bool change_size=true);
void add_cachesize(int64 toadd);
void sub_cachesize(int64 tosub);
bool is_sync_wait_item(const std::string& key);
bool is_sync_wait_item(const std::string& key, int64 transid);
void check_submission_items();
void update_transactions();
void drop_cache(IFsFile* fd);
#ifdef HAS_ASYNC
fuse_io_context::io_uring_task<void> drop_cache_async(fuse_io_context& io, IFsFile* fd);
#endif
void set_read_random(IFsFile* fd);
#ifdef HAS_ASYNC
fuse_io_context::io_uring_task<void> set_read_random_async(fuse_io_context& io, IFsFile* fd);
#endif
bool add_submitted_item(int64 transid, std::string tkey, std::unique_ptr<IFile>* fd_cache);
bool add_evicted_item(int64 transid, std::string tkey);
bool add_item(std::string fn, int64 transid, std::string tkey, std::unique_ptr<IFile>* fd_cache);
IFsFile* get_mem_file(const std::string& key, int64 size_hint, bool for_read);
bool has_memfile_stat(const std::string& key);
void add_memfile_stat(const std::string& key);
bool rm_mem_file(fuse_io_context* io, int64 transid, const std::string& key, bool rm_submitted);
void rm_submission_item(std::map<std::pair<int64, std::string>, std::list<SSubmissionItem>::iterator >::iterator it);
bool cow_mem_file(SMemFile* memf, bool with_old_file);
#ifdef HAS_ASYNC
fuse_io_context::io_uring_task<bool> cow_mem_file_async(fuse_io_context& io, SMemFile* memf, bool with_old_file);
#endif
void remove_missing(const std::string& key);
int64 get_compsize(const std::string& key, int64 transid);
void only_memfiles_throttle(const std::string& key, std::unique_lock<cache_mutex_t>& lock);
#ifdef HAS_ASYNC
fuse_io_context::io_uring_task<void> only_memfiles_throttle_async(fuse_io_context& io, const std::string key);
#endif
std::string readCacheFile(const std::string& fn);
bool cacheFileExists(const std::string& fn);
bool writeToCacheFile(const std::string& str, const std::string& fn);
TransactionalKvStore::SCacheVal cache_val(const std::string& key, bool dirty);
TransactionalKvStore::SCacheVal cache_val_nc(bool dirty);
bool set_cache_file_compression(const std::string& key, const std::string& fpath);
std::list<SSubmissionItem>::iterator submission_queue_add(SSubmissionItem& item, bool memfile);
std::list<SSubmissionItem>::iterator submission_queue_insert(SSubmissionItem& item, bool memfile, std::list<SSubmissionItem>::iterator it);
void submission_queue_rm(std::list<SSubmissionItem>::iterator it);
int64 cache_free_space();
class RegularSubmitBundleThread : public IThread
{
TransactionalKvStore* kv_store;
bool do_quit;
std::condition_variable_any cond;
void regular_submit_bundle(std::unique_lock<cache_mutex_t>& cache_lock);
public:
RegularSubmitBundleThread(TransactionalKvStore* kv_store)
:kv_store(kv_store), do_quit(false) {}
void operator()();
void quit();
};
class MetadataUpdateThread : public IThread
{
public:
TransactionalKvStore* kv_store;
bool do_quit;
std::condition_variable_any cond;
MetadataUpdateThread(TransactionalKvStore* kv_store)
:kv_store(kv_store), do_quit(false) {}
void operator()();
void quit();
};
class ThrottleThread : public IThread
{
public:
TransactionalKvStore* kv_store;
bool do_quit;
std::condition_variable_any cond;
ThrottleThread(TransactionalKvStore* kv_store)
:kv_store(kv_store), do_quit(false) {}
void operator()();
void quit();
};
class MemfdDelThread : public IThread
{
std::mutex mutex;
std::condition_variable cond;
std::vector<std::shared_ptr<IFsFile> > del_fds;
bool do_quit = false;
public:
MemfdDelThread() {
}
void quit() {
std::scoped_lock lock(mutex);
do_quit = true;
cond.notify_all();
}
void del(std::shared_ptr<IFsFile>&& fd)
{
std::unique_lock lock(mutex);
while (del_fds.size() > 1000)
{
lock.unlock();
Server->wait(100);
lock.lock();
}
del_fds.push_back(fd);
cond.notify_all();
}
void operator()();
};
#ifdef HAS_ASYNC
struct AwaiterCoList
{
AwaiterCoList* next;
std::coroutine_handle<> awaiter;
};
AwaiterCoList* retrieval_waiters_head = nullptr;
struct RetrievalAwaiter
{
RetrievalAwaiter(TransactionalKvStore& kv_store)
: kv_store(kv_store) {}
RetrievalAwaiter(RetrievalAwaiter const&) = delete;
RetrievalAwaiter(RetrievalAwaiter&& other) = delete;
RetrievalAwaiter& operator=(RetrievalAwaiter&&) = delete;
RetrievalAwaiter& operator=(RetrievalAwaiter const&) = delete;
bool await_ready() const noexcept
{
return false;
}
void await_suspend(std::coroutine_handle<> p_awaiter) noexcept
{
kv_store.check_mutex_not_held();
awaiter.awaiter = p_awaiter;
awaiter.next = kv_store.retrieval_waiters_head;
kv_store.retrieval_waiters_head = &awaiter;
}
void await_resume() const noexcept
{
}
private:
AwaiterCoList awaiter;
TransactionalKvStore& kv_store;
};
void resume_retrieval_awaiters()
{
AwaiterCoList* curr = retrieval_waiters_head;
retrieval_waiters_head = nullptr;
while (curr != nullptr)
{
AwaiterCoList* next = curr->next;
curr->awaiter.resume();
curr = next;
}
}
#endif
common::lrucache<std::string, SCacheVal> lru_cache;
std::map<std::string, SFdKey> open_files;
std::map<IFsFile*, ReadOnlyFileWrapper*> read_only_open_files;
std::map<std::string, int> preload_once_items;
std::map<std::string, int64> preload_once_delayed_removal;
std::list<SSubmissionItem> submission_queue;
std::list<SSubmissionItem>::iterator submission_queue_memfile_first;
std::map<std::pair<int64, std::string>, std::list<SSubmissionItem>::iterator > submission_items;
std::map<int64, size_t> num_dirty_items;
#ifdef DIRTY_ITEM_CHECK
std::map<int64, std::map<std::string, size_t> > dirty_items;
std::map<int64, std::map<std::string, int64> > dirty_items_size;
#endif
std::map<int64, size_t> num_delete_items;
common::lrucache<std::string, SFdKey> fd_cache;
cache_mutex_t cache_mutex;
std::recursive_mutex submission_mutex;
std::recursive_mutex dirty_item_mutex;
std::condition_variable_any evict_cond;
std::set<std::string> queued_dels;
std::map<std::string, size_t> in_retrieval;
std::condition_variable_any retrieval_cond;
common::lrucache<std::string, SCacheVal> compressed_items;
std::set<std::string> dirty_evicted_items;
std::map<int64, std::set<std::string> > nosubmit_dirty_items;
std::set<std::string> nosubmit_untouched_items;
std::set<std::string> del_file_queue;
std::string prio_del_file;
std::unique_ptr<ICondition> prio_del_file_cond;
std::unique_ptr<IMutex> del_file_mutex;
std::unique_ptr<IMutex> del_file_single_mutex;
std::vector<SFile> transactions;
std::unique_ptr<IMutex> evicted_mutex;
std::recursive_mutex memfiles_mutex;
common::lrucache<std::pair<int64, std::string>, SMemFile> memfiles;
std::vector<std::pair<int64, std::unique_ptr<Bitmap> > > memfile_stat_bitmaps;
std::map<int64, size_t> num_mem_files;
relaxed_atomic<int64> memfile_size;
relaxed_atomic<int64> submitted_memfile_size;
int64 submit_bundle_starttime;
std::unique_ptr<IMutex> submit_bundle_item_mutex;
std::vector<std::pair<SSubmissionItem, bool> > submit_bundle;
std::set<std::pair<std::string, int64> > submit_bundle_items_a;
std::set<std::pair<std::string, int64> > submit_bundle_items_b;
std::set<std::pair<std::string, int64> >* curr_submit_bundle_items;
std::set<std::pair<std::string, int64> >* other_submit_bundle_items;
std::set<std::string> missing_items;
std::vector<THREADPOOL_TICKET> threads;
IBackupFileSystem* cachefs;
relaxed_atomic<int64> cachesize;
relaxed_atomic<int64> dirty_bytes;
relaxed_atomic<int64> comp_bytes;
relaxed_atomic<int64> submitted_bytes;
relaxed_atomic<int64> total_submitted_bytes;
int64 total_hits;
int64 total_memory_hits;
relaxed_atomic<int64> total_dirty_ops;
relaxed_atomic<int64> total_cache_miss_backend;
relaxed_atomic<int64> total_cache_miss_decompress;
relaxed_atomic<int64> total_put_ops;
relaxed_atomic<int64> total_compress_ops;
relaxed_atomic<int64> max_cachesize;
int64 transid;
int64 basetrans;
int64 min_cachesize;
int64 min_free_size;
int64 throttle_free_size;
int64 critical_free_size;
float comp_percent;
int64 comp_start_limit;
bool curr_submit_compress_evict;
float resubmit_compressed_ratio;
int64 max_memfile_size;
size_t submitted_memfiles;
size_t remaining_gets;
bool has_new_remaining_gets = false;
size_t new_remaining_gets = 0;
size_t unthrottled_gets;
double unthrottled_gets_avg;
bool do_evict;
int64 do_evict_starttime;
bool do_stop;
size_t evict_queue_depth;
size_t compress_queue_depth;
relaxed_atomic<int64> metadata_cache_free;
int64 min_metadata_cache_free;
IOnlineKvStore* online_kv_store;
std::string encryption_key;
ICompressEncryptFactory* compress_encrypt_factory;
RegularSubmitBundleThread regular_submit_bundle_thread;
ThrottleThread throttle_thread;
MetadataUpdateThread metadata_update_thread;
MemfdDelThread memfd_del_thread;
bool with_sync_wait;
bool with_prev_link;
bool allow_evict;
bool with_submitted_files;
size_t fd_cache_size;
bool evict_non_dirty_memfiles;
int64 compression_starttime;
INumSecondChancesCallback* num_second_chances_cb;
bool only_memfiles;
bool disable_read_memfiles;
bool disable_write_memfiles;
unsigned int background_comp_method;
size_t retrieval_waiters_async;
size_t retrieval_waiters_sync;
std::string memcache_path;
std::unique_ptr<IFsFile> cache_lock_file;
unsigned int cache_comp;
unsigned int meta_cache_comp;
};