New settings to compress cache (meta) objects via btrfs compression

(cherry picked from commit 989e60f639e7add19c0a72667efa4fa9219f0b6a)

# Conflicts:
#	Interface/BackupFileSystem.h
#	btrfs/btrfsplugin/BackupFileSystem.h
#	clouddrive/CloudFile.cpp
#	clouddrive/KvStoreBackendAzure.cpp
#	clouddrive/KvStoreBackendAzure.h
#	clouddrive/KvStoreBackendLocal.cpp
#	clouddrive/KvStoreBackendLocal.h
#	clouddrive/KvStoreBackendS3.cpp
#	clouddrive/KvStoreBackendS3.h
#	clouddrive/KvStoreFrontend.cpp
#	clouddrive/OnlineKvStore.cpp
#	clouddrive/OnlineKvStore.h
#	clouddrive/PassThroughFileSystem.cpp
#	clouddrive/dllmain.cpp
This commit is contained in:
Martin 2021-07-12 19:04:17 +02:00
parent e0b33e4f1a
commit a9c2bd1ec5
14 changed files with 8264 additions and 8165 deletions

File diff suppressed because it is too large Load Diff

View File

@ -62,7 +62,8 @@ public:
unsigned int background_comp_method,
const std::string& slog_path,
int64 slog_max_size,
bool is_async);
bool is_async,
unsigned int cache_comp, unsigned int meta_cache_comp);
~CloudFile();

View File

@ -88,7 +88,9 @@ IFile* ClouddriveFactory::createCloudFile(IBackupFileSystem* cachefs, CloudSetti
settings.resubmit_compressed_ratio, settings.memcache_size, std::string(),
settings.memory_usage_factor, settings.only_memfiles, std::string(),
std::string(), static_cast<unsigned int>(settings.background_compression),
std::string(), -1, false);
std::string(), -1, false,
static_cast<unsigned int>(settings.cache_object_compression),
static_cast<unsigned int>(settings.metadata_cache_object_compression));
}
IKvStoreBackend* ClouddriveFactory::createBackend(IBackupFileSystem* cachefs,

View File

@ -52,10 +52,35 @@ unsigned int CompressionMethodFromString(const std::string & str)
return CompressionNone;
else if (str == "zstd_7")
return CompressionZstd7;
else if (str == "lzo" )
return CompressionLzo;
else
return CompressionZstd3;
}
std::string CompressionMethodToBtrfsString(unsigned int cmeth)
{
switch (cmeth)
{
case CompressionZlib5:
return "zlib:5";
case CompressionZstd3:
return "zstd:3";
case CompressionZstd19:
return "zstd:15";
case CompressionZstd9:
return "zstd:9";
case CompressionZstd7:
return "zstd:7";
case CompressionNone:
return "none";
case CompressionLzo:
return "lzo";
default:
return "zstd:3";
}
}
ICompressEncryptFactory* get_compress_encrypt_factory()
{
return compress_encrypt_factory;

View File

@ -21,7 +21,9 @@ public:
zstd_3 = 2,
zstd_19 = 3,
zstd_9 = 4,
none = 5
none = 5,
zstd7 = 6,
lzo = 7
};
struct CloudSettingsS3
@ -48,6 +50,8 @@ public:
CompressionMethod submit_compression = CompressionMethod::none;
CompressionMethod metadata_submit_compression = CompressionMethod::zstd_3;
CompressionMethod background_compression = CompressionMethod::lzma_5;
CompressionMethod cache_object_compression = CompressionMethod::none;
CompressionMethod metadata_cache_object_compression = CompressionMethod::lzo;
bool multi_trans_delete = true;
int64 reserved_cache_device_space = 100 * 1024 * 1024;
float memory_usage_factor = 0.1f;

View File

@ -34,9 +34,11 @@ namespace
const unsigned int CompressionZstd9 = 4;
const unsigned int CompressionNone = 5;
const unsigned int CompressionZstd7 = 6;
const unsigned int CompressionLzo = 7;
}
unsigned int CompressionMethodFromString(const std::string& str);
std::string CompressionMethodToBtrfsString(unsigned int cmeth);
class ICompressEncryptFactory : public IObject
{

View File

@ -57,15 +57,16 @@ public:
const static unsigned int GetStatusEnospc = 2;
const static unsigned int GetStatusNotFound = 4;
const static unsigned int GetStatusRepairError = 8;
const static unsigned int GetStatusSkipped = 16;
virtual bool get( const std::string& key, const std::string& path, const std::string& md5sum,
unsigned int flags, bool allow_error_event, IFsFile*& ret_file, std::string& ret_md5sum, unsigned int& get_status) = 0;
virtual bool get( const std::string& key, const std::string& md5sum,
unsigned int flags, bool allow_error_event, IFsFile* ret_file, std::string& ret_md5sum, unsigned int& get_status) = 0;
virtual bool list(IListCallback* callback) = 0;
const static unsigned int PutAlreadyCompressedEncrypted = 1;
const static unsigned int PutMetadata = 2;
virtual bool put( const std::string& key, IFsFile* src, const std::string& path,
virtual bool put( const std::string& key, IFsFile* src,
unsigned int flags, bool allow_error_event, std::string& md5sum, int64& size) = 0;
enum class key_next_action_t

View File

@ -1,12 +1,12 @@
#pragma once
#include "../Interface/File.h"
#include "../Interface/Object.h"
#include "../Interface/SharedMutex.h"
#include "../Interface/Object.h"
#include "../Interface/SharedMutex.h"
class IOnlineKvStore : public IObject
{
public:
virtual IFsFile* get(const std::string& key, int64 transid, const std::string& path,
virtual IFsFile* get(const std::string& key, int64 transid,
bool prioritize_read, IFsFile* tmpl_file, bool allow_error_event,
bool& not_found, int64* get_transid=nullptr) = 0;
@ -19,7 +19,7 @@ public:
const static unsigned int PutMetadata = 2;
virtual bool put(const std::string& key, int64 transid, IFsFile* src,
const std::string& path, unsigned int flags,
unsigned int flags,
bool allow_error_event, int64& compressed_size) = 0;
virtual int64 new_transaction(bool allow_error_event) = 0;

View File

@ -339,9 +339,10 @@ void KvStoreBackendS3::init_mutex()
Aws::InitAPI(options);
}
bool KvStoreBackendS3::get( const std::string& key, const std::string& path, const std::string& md5sum,
unsigned int flags, bool allow_error_event, IFsFile*& ret_file, std::string& ret_md5sum, unsigned int& get_status)
bool KvStoreBackendS3::get( const std::string& key, const std::string& md5sum,
unsigned int flags, bool allow_error_event, IFsFile* ret_file, std::string& ret_md5sum, unsigned int& get_status)
{
assert(ret_file!=nullptr);
get_status=0;
size_t idx0 = 0;
@ -444,37 +445,11 @@ bool KvStoreBackendS3::get( const std::string& key, const std::string& path, con
downloaded_bytes+=tmpfile->Size();
std::unique_ptr<IFsFile> res_src;
IFsFile* res;
if(ret_file==nullptr)
{
res_src.reset(cachefs->openFile(path, MODE_RW_CREATE));
if(res_src.get()==nullptr)
{
std::string syserr = os_last_error_str();
if (allow_error_event)
{
addSystemEvent("s3_backend",
"Error opening result file",
"Error opening result file at " + path + ". " + syserr, LL_ERROR);
}
Server->Log("Error opening result file at "+path+". "+syserr, LL_ERROR);
return false;
}
res = res_src.get();
}
else
{
res = ret_file;
}
std::unique_ptr<IDecryptAndDecompress> decrypt_and_decompress;
if(flags & IKvStoreBackend::GetDecrypted)
{
decrypt_and_decompress.reset(compress_encrypt_factory->createDecryptAndDecompress(encryption_key, res));
decrypt_and_decompress.reset(compress_encrypt_factory->createDecryptAndDecompress(encryption_key, ret_file));
}
std::vector<char> buffer;
@ -505,7 +480,7 @@ bool KvStoreBackendS3::get( const std::string& key, const std::string& path, con
else if(decrypt_and_decompress.get()==nullptr)
{
md_check.update(reinterpret_cast<unsigned char*>(buffer.data()), read);
if(res->Write(buffer.data(), read)!=read)
if(ret_file->Write(buffer.data(), read)!=read)
{
std::string syserr = os_last_error_str();
Server->Log("Error writing to result file. "+syserr, LL_ERROR);
@ -557,10 +532,6 @@ bool KvStoreBackendS3::get( const std::string& key, const std::string& path, con
return false;
}
if(ret_file==nullptr)
{
ret_file = res_src.release();
}
return true;
}
else
@ -691,36 +662,10 @@ bool KvStoreBackendS3::list( IListCallback* callback )
return true;
}
bool KvStoreBackendS3::put( const std::string& key, IFsFile* src, const std::string& path,
bool KvStoreBackendS3::put( const std::string& key, IFsFile* src,
unsigned int flags, bool allow_error_event, std::string& md5sum, int64& compressed_size)
{
std::unique_ptr<IFsFile> tsourcefile;
if(src==nullptr)
{
tsourcefile.reset(cachefs->openFile(path, MODE_READ));
if(tsourcefile.get()==nullptr)
{
std::string syserr = os_last_error_str();
Server->Log("Error opening source file "+ path+". "+syserr, LL_ERROR);
cachefs->openFile(path, MODE_READ);
if(allow_error_event)
{
addSystemEvent("s3_backend",
"Error opening source file",
"Error opening source file "+ path+". "+syserr, LL_ERROR);
}
return false;
}
src = tsourcefile.get();
}
else
{
src->Seek(0);
}
src->Seek(0);
unsigned int curr_comp_method = (flags & IKvStoreBackend::GetMetadata) > 0 ? comp_method_metadata
: comp_method;
@ -786,8 +731,6 @@ bool KvStoreBackendS3::put( const std::string& key, IFsFile* src, const std::str
local_md5 = compress_encrypt->md5sum();
tsourcefile.reset();
tmpfile_delete.release();
offset_buffer.reset(new offset_buf(0, tmpfile, this, true));
}

View File

@ -22,12 +22,12 @@ public:
static void init_mutex();
virtual bool get( const std::string& key, const std::string& path, const std::string& md5sum,
unsigned int flags, bool allow_error_event, IFsFile*& ret_file, std::string& ret_md5sum, unsigned int& get_status);
virtual bool get( const std::string& key, const std::string& md5sum,
unsigned int flags, bool allow_error_event, IFsFile* ret_file, std::string& ret_md5sum, unsigned int& get_status);
virtual bool list( IListCallback* callback );
virtual bool put( const std::string& key, IFsFile* src, const std::string& path,
virtual bool put( const std::string& key, IFsFile* src,
unsigned int flags, bool allow_error_event, std::string& md5sum,
int64& compressed_size) override;

View File

@ -41,6 +41,7 @@
#include "../urbackupcommon/events.h"
#include <zlib.h>
#include <memory>
#include "Auto.h"
#ifdef HAS_MIRROR
#include "../urbackupserver/server_settings.h"
#endif
@ -264,7 +265,7 @@ KvStoreFrontend::KvStoreFrontend(const std::string& db_path,
IFsFile* ret_file = tmp_f;
std::string ret_md5sum;
unsigned int get_status;
if (!backend->get("cd_magic_file", std::string(), std::string(), IKvStoreBackend::GetDecrypted,
if (!backend->get("cd_magic_file", std::string(), IKvStoreBackend::GetDecrypted,
false, ret_file, ret_md5sum, get_status))
{
KvStoreDao::CdSingleObject obj = dao.getSingleObject();
@ -274,7 +275,7 @@ KvStoreFrontend::KvStoreFrontend(const std::string& db_path,
Server->Log("Retrieving " + prefixKey(encodeKey(obj.tkey, obj.trans_id))
+ " (first object in cache database)");
if (!backend->get(prefixKey(encodeKey(obj.tkey, obj.trans_id)),
std::string(), obj.md5sum, IKvStoreBackend::GetDecrypted,
obj.md5sum, IKvStoreBackend::GetDecrypted,
false, ret_file, ret_md5sum, get_status))
{
Server->Log("Retrieving " + prefixKey(encodeKey(obj.tkey, obj.trans_id))
@ -298,7 +299,7 @@ KvStoreFrontend::KvStoreFrontend(const std::string& db_path,
if (has_item.has_item
&& !backend->get(has_item.item_key,
std::string(), std::string(), IKvStoreBackend::GetDecrypted,
std::string(), IKvStoreBackend::GetDecrypted,
true, ret_file, ret_md5sum, get_status))
{
std::string err = "Error getting item " + has_item.item_key + " from cloud drive. Encryption key may be wrong.";
@ -330,7 +331,7 @@ KvStoreFrontend::KvStoreFrontend(const std::string& db_path,
std::string md5sum;
int64 size;
if (!backend->put("cd_magic_file", new_f, std::string(), 0, true, md5sum, size))
if (!backend->put("cd_magic_file", new_f, 0, true, md5sum, size))
{
std::string err = "Error uploading magic file.";
setMountStatusErr(err);
@ -469,12 +470,14 @@ KvStoreFrontend::KvStoreFrontend(const std::string& db_path,
if (!scrub_obj.empty())
{
std::unique_ptr<IFile> tmpf(Server->openTemporaryFile());
IFsFile* ret_file = nullptr;
ScopedFreeObjRef<IFsFile*> free_get_file(ret_file);
IFsFile* tmpf = Server->openTemporaryFile();
ScopedDeleteFile del_tmpf(tmpf);
if(tmpf==nullptr)
abort();
std::string ret_md5sum;
unsigned int get_status;
if (!backend->get(trim(scrub_obj), "tmp://"+tmpf->getFilename(), std::string(), IKvStoreBackend::GetScrub, true, ret_file, ret_md5sum, get_status))
if (!backend->get(trim(scrub_obj), std::string(), IKvStoreBackend::GetScrub, true, tmpf, ret_md5sum, get_status))
{
Server->Log("Get scrub of object " + scrub_obj + " failed", LL_ERROR);
}
@ -509,23 +512,12 @@ KvStoreFrontend::~KvStoreFrontend()
}
IFsFile* KvStoreFrontend::get(int64 cd_id, const std::string& key, int64 transid,
const std::string& path, bool prioritize_read, IFsFile* tmpl_file,
bool prioritize_read, IFsFile* tmpl_file,
bool allow_error_event, bool& not_found, int64* get_transid)
{
not_found=false;
if(tmpl_file==nullptr
&& !cachefs->directoryExists(ExtractFilePath(path)))
{
if(!cachefs->createDir(ExtractFilePath(path)))
{
std::string syserr = os_last_error_str();
Server->Log("Error creating directory for GET-file with key "+bytesToHex(key)+" path "+ ExtractFilePath(path)+". "+ syserr, LL_ERROR);
addSystemEvent("cache_err_retry",
"Error creating directory for get file on cache",
"Error creating directory for GET-file with key " + bytesToHex(key) + " path " + ExtractFilePath(path) + ". " + syserr, LL_ERROR);
return nullptr;
}
}
if (tmpl_file == nullptr)
return nullptr;
bool is_unsynced = false;
SUnsyncedKey unsynced_key;
@ -636,10 +628,8 @@ IFsFile* KvStoreFrontend::get(int64 cd_id, const std::string& key, int64 transid
|| cd_object.size==-1)
{
not_found=true;
if (tmpl_file!=nullptr)
return tmpl_file;
dao.getDb()->freeMemory();
return cachefs->openFile(path, MODE_RW_CREATE);
return tmpl_file;
}
if (cd_object.md5sum.empty()
@ -674,13 +664,13 @@ IFsFile* KvStoreFrontend::get(int64 cd_id, const std::string& key, int64 transid
bool from_mirror = false;
std::string bkey = prefixKey(encodeKey(cd_id, key, cd_object.trans_id));
if (!backend->get(bkey, path, cd_object.md5sum,
if (!backend->get(bkey, cd_object.md5sum,
flags, allow_error_event, ret, ret_md5sum, get_status))
{
not_found = (get_status & IKvStoreBackend::GetStatusNotFound)>0;
bool backend_not_found = not_found;
if (backend_mirror == nullptr
|| !backend_mirror->get(bkey, path,
|| !backend_mirror->get(bkey,
get_md5sum(cd_object.md5sum),
flags, allow_error_event, ret, ret_md5sum, get_status))
{
@ -690,9 +680,7 @@ IFsFile* KvStoreFrontend::get(int64 cd_id, const std::string& key, int64 transid
not_found = backend_not_found && not_found;
if (not_found)
{
if (tmpl_file != nullptr)
return tmpl_file;
return cachefs->openFile(path, MODE_RW_CREATE);
return tmpl_file;
}
return nullptr;
}
@ -811,7 +799,7 @@ bool KvStoreFrontend::reset(const std::string & key, int64 transid)
}
bool KvStoreFrontend::put(int64 cd_id, const std::string& key, int64 transid,
int64 generation, IFsFile* src, const std::string& path, unsigned int cflags,
int64 generation, IFsFile* src, unsigned int cflags,
bool allow_error_event, int64& compressed_size)
{
IScopedReadLock read_lock(put_shared_mutex.get());
@ -906,7 +894,7 @@ bool KvStoreFrontend::put(int64 cd_id, const std::string& key, int64 transid,
put_flags |= IKvStoreBackend::PutMetadata;
std::string md5sum;
compressed_size = 0;
bool ret = backend->put(tkey, src, path, put_flags, allow_error_event, md5sum, compressed_size);
bool ret = backend->put(tkey, src, put_flags, allow_error_event, md5sum, compressed_size);
if(ret)
{
@ -1006,8 +994,16 @@ bool KvStoreFrontend::transaction_finalize(int64 cd_id, int64 transid, bool comp
else
tkey = convert(cd_id)+"_"+convert(transid) + (complete ? "_complete" : "_finalized");
std::unique_ptr<IFsFile> src(cachefs->openFile(empty_file_path, MODE_READ));
if(!src)
{
Server->Log("Error opening empty file. "+cachefs->lastError(), LL_ERROR);
return false;
}
if (backend->put(prefixKey(tkey),
nullptr, empty_file_path, 0, allow_error_event, md5sum, size))
src.get(), 0, allow_error_event, md5sum, size))
{
tries = -2;
break;
@ -1061,6 +1057,14 @@ bool KvStoreFrontend::set_active_transactions(int64 cd_id, const std::vector<int
{
Server->Log("Setting active transactions... (cd_id=" + convert(cd_id) + ")", LL_INFO);
std::unique_ptr<IFsFile> empty_f(cachefs->openFile(empty_file_path, MODE_READ));
if(!empty_f)
{
Server->Log("Error opening empty file -2. "+cachefs->lastError(), LL_ERROR);
return false;
}
IDatabase* db = getDatabase();
KvStoreDao dao(db);
DBScopedSynchronous synchronous(dao.getDb());
@ -1095,7 +1099,7 @@ bool KvStoreFrontend::set_active_transactions(int64 cd_id, const std::vector<int
tkey = convert(incomplete_transactions[i]) + "_inactive";
else
tkey = convert(cd_id)+"_"+convert(incomplete_transactions[i]) + "_inactive";
if (backend->put(prefixKey(tkey), nullptr, empty_file_path, 0, true, md5sum, size))
if (backend->put(prefixKey(tkey), empty_f.get(), 0, true, md5sum, size))
{
tries = -2;
break;
@ -1673,16 +1677,15 @@ bool KvStoreFrontend::importFromBackend( KvStoreDao& dao )
return false;
}
ScopedDeleteFile tmp_f_del(tmp_f);
IFsFile* ret_file = tmp_f;
std::string ret_md5sum;
unsigned int get_status;
int64 import_total_num_files = -1;
if (backend->get("cd_num_file", std::string(), std::string(), IKvStoreBackend::GetDecrypted,
false, ret_file, ret_md5sum, get_status))
if (backend->get("cd_num_file", std::string(), IKvStoreBackend::GetDecrypted,
false, tmp_f, ret_md5sum, get_status))
{
if (ret_file->Size() < 1 * 1024 * 1024)
if (tmp_f->Size() < 1 * 1024 * 1024)
{
std::string num_data = ret_file->Read(0LL, static_cast<_u32>(ret_file->Size()));
std::string num_data = tmp_f->Read(0LL, static_cast<_u32>(tmp_f->Size()));
CRData rnumdata(num_data.data(), num_data.size());
@ -1703,7 +1706,7 @@ bool KvStoreFrontend::importFromBackend( KvStoreDao& dao )
}
else
{
Server->Log("cd_num_file too large ("+PrettyPrintBytes(ret_file->Size())+")", LL_WARNING);
Server->Log("cd_num_file too large ("+PrettyPrintBytes(tmp_f->Size())+")", LL_WARNING);
}
}
else
@ -1731,17 +1734,14 @@ bool KvStoreFrontend::importFromBackend( KvStoreDao& dao )
{
Server->Log("Retrieving last object to get current generation...", LL_INFO);
std::unique_ptr<IFile> temp_file(Server->openTemporaryFile());
std::string temp_file_path = temp_file->getFilename();
temp_file.reset();
IFsFile* temp_file = Server->openTemporaryFile();
ScopedDeleteFile delete_last_f(temp_file);
IFsFile* last_f = nullptr;
std::string md5sum_ret;
unsigned int get_status;
bool b = backend->get(import_callback.lastModifiedKey(), "tmp://"+temp_file_path, import_callback.lastModifiedMd5sum(), 0, true, last_f, md5sum_ret, get_status);
ScopedDeleteFile delete_last_f(last_f);
bool b = backend->get(import_callback.lastModifiedKey(), import_callback.lastModifiedMd5sum(), 0, true, temp_file, md5sum_ret, get_status);
if(!b || last_f==nullptr)
if(!b)
{
Server->Log("Error retrieving last modified file", LL_ERROR);
return false;
@ -1749,7 +1749,7 @@ bool KvStoreFrontend::importFromBackend( KvStoreDao& dao )
int64 generation=0;
if(!read_generation(last_f, 0, generation))
if(!read_generation(temp_file, 0, generation))
{
return false;
}
@ -1985,11 +1985,10 @@ bool KvStoreFrontend::reupload(int64 transid_start, int64 transid_stop, IKvStore
std::string ret_md5sum;
unsigned int get_status;
IFsFile* ret_file = tmp_f;
int flags = IKvStoreBackend::GetDecrypted;
if (!old_backend->get(fkey, std::string(), md5sum, flags, false, ret_file, ret_md5sum, get_status))
if (!old_backend->get(fkey, md5sum, flags, false, tmp_f, ret_md5sum, get_status))
{
if (!backend->get(fkey, std::string(), md5sum, flags, false, ret_file, ret_md5sum, get_status))
if (!backend->get(fkey, md5sum, flags, false, tmp_f, ret_md5sum, get_status))
{
Server->Log("Error reading " + fkey + ". status=" + convert(get_status), LL_ERROR);
ret = false;
@ -2000,7 +1999,7 @@ bool KvStoreFrontend::reupload(int64 transid_start, int64 transid_stop, IKvStore
{
int64 size = 0;
std::string new_md5sum;
if (backend->put(fkey, ret_file, "tmp://"+ret_file->getFilename(), 0, false, ret_md5sum, size))
if (backend->put(fkey, tmp_f, 0, false, ret_md5sum, size))
{
Server->Log("Successfully rewritten " + fkey);
KvStoreDao::CdIterObject new_obj;
@ -2377,7 +2376,7 @@ bool KvStoreFrontend::update_total_num(int64 num)
std::string md5sum;
int64 size;
if (!backend->put("cd_num_file", new_f, std::string(), 0, true, md5sum, size))
if (!backend->put("cd_num_file", new_f, 0, true, md5sum, size))
{
std::string err = "Error uploading cd_num_file file.";
Server->Log(err, LL_ERROR);
@ -4257,7 +4256,16 @@ void KvStoreFrontend::PutDbWorker::worker_abort()
void KvStoreFrontend::ScrubThread::operator()()
{
std::string unused_tmp_file;
Auto(delete this);
IFsFile* tmpf = Server->openTemporaryFile();
ScopedDeleteFile del_tmpf(tmpf);
if (tmpf == nullptr)
{
Server->Log("Error opening temporary file in scrub. " + os_last_error_str(), LL_ERROR);
scrub_queue.set_error(true);
return;
}
while (true)
{
@ -4267,23 +4275,9 @@ void KvStoreFrontend::ScrubThread::operator()()
{
break;
}
if (unused_tmp_file.empty())
{
std::unique_ptr<IFile> tmpf(Server->openTemporaryFile());
if (tmpf.get() == nullptr)
{
Server->Log("Error opening temporary file in scrub. " + os_last_error_str(), LL_ERROR);
scrub_queue.set_error(true);
return;
}
unused_tmp_file = tmpf->getFilename();
}
ScopedDeleteFn delete_fn(unused_tmp_file);
IFsFile* get_file = nullptr;
std::string ret_md5sum;
unsigned int get_status;
ScopedFreeObjRef<IFsFile*> free_get_file(get_file);
int flags = IKvStoreBackend::GetBackground;
if (scrub_action == ScrubAction::Balance)
@ -4321,8 +4315,8 @@ void KvStoreFrontend::ScrubThread::operator()()
}
bool b = backend->get(object_fn,
"tmp://"+unused_tmp_file, item.md5sum, flags,
false, get_file, ret_md5sum, get_status);
item.md5sum, flags,
false, tmpf, ret_md5sum, get_status);
size_t tries = 3;
size_t retry_n = 0;
@ -4342,9 +4336,9 @@ void KvStoreFrontend::ScrubThread::operator()()
}
Server->wait(1000);
b = backend->get(object_fn,
"tmp://"+unused_tmp_file, item.md5sum, flags,
item.md5sum, flags,
false,
get_file, ret_md5sum, get_status);
tmpf, ret_md5sum, get_status);
--tries;
++retry_n;
@ -4360,8 +4354,8 @@ void KvStoreFrontend::ScrubThread::operator()()
{
if (backend_mirror == nullptr
|| !backend_mirror->get(object_fn,
"tmp://"+unused_tmp_file, item.md5sum, flags,
false, get_file, ret_md5sum, get_status))
item.md5sum, flags,
false, tmpf, ret_md5sum, get_status))
{
Server->Log("Error while " + strScrubAction(scrub_action) + " key " + object_fn+" Has_newer = "+convert(has_newer), LL_ERROR);
curr_has_error = true;
@ -4376,14 +4370,24 @@ void KvStoreFrontend::ScrubThread::operator()()
Server->Log("Repairing. Adding empty file...", LL_WARNING);
int64 size;
std::string md5sum;
if (!backend->put(object_fn,
nullptr, frontend->empty_file_path, 0, false, md5sum, size))
std::unique_ptr<IFsFile> empty_file(frontend->cachefs->openFile("empty.file", MODE_READ));
if(!empty_file)
{
Server->Log("Error while opening empty file. "+frontend->cachefs->lastError(), LL_ERROR);
++scrub_queue.scrub_errors;
}
else
{
++scrub_queue.scrub_repaired;
if (!backend->put(object_fn,
empty_file.get(), 0, false, md5sum, size))
{
++scrub_queue.scrub_errors;
}
else
{
++scrub_queue.scrub_repaired;
}
}
}
else
@ -4391,8 +4395,8 @@ void KvStoreFrontend::ScrubThread::operator()()
++scrub_queue.scrub_errors;
}
}
else if (backend_mirror != nullptr
&& get_file != nullptr)
else if (backend_mirror != nullptr &&
!(get_status & IKvStoreBackend::GetStatusSkipped))
{
bool put_success = false;
size_t tries = 0;
@ -4400,7 +4404,7 @@ void KvStoreFrontend::ScrubThread::operator()()
{
std::string put_md5sum;
int64 put_size = 0;
if (backend->put(object_fn, get_file, "tmp://"+unused_tmp_file, IKvStoreBackend::PutAlreadyCompressedEncrypted, true, put_md5sum, put_size))
if (backend->put(object_fn, tmpf, IKvStoreBackend::PutAlreadyCompressedEncrypted, true, put_md5sum, put_size))
{
put_success = true;
item.md5sum = put_md5sum;
@ -4429,7 +4433,7 @@ void KvStoreFrontend::ScrubThread::operator()()
++scrub_queue.scrub_errors;
}
}
else if (scrub_action != ScrubAction::Scrub || get_file != nullptr)
else
{
if (get_status & IKvStoreBackend::GetStatusRepaired)
++scrub_queue.scrub_repaired;
@ -4437,20 +4441,16 @@ void KvStoreFrontend::ScrubThread::operator()()
++scrub_queue.scrub_oks;
}
if (scrub_action == ScrubAction::Scrub && get_file == nullptr && !curr_has_error)
if(!(get_status & IKvStoreBackend::GetStatusSkipped))
{
Server->Log("No file while "+ strScrubAction(scrub_action)+" key "+
object_fn+" has_newer="+convert(has_newer), LL_ERROR);
++scrub_queue.scrub_errors;
}
if (b && get_file == nullptr)
{
delete_fn.release();
}
else
{
unused_tmp_file.clear();
if(!tmpf->Resize(0))
{
Server->Log("Error resizing temporary file in scrub. " + os_last_error_str(), LL_ERROR);
scrub_queue.set_error(true);
return;
}
}
if (b
&& !ret_md5sum.empty())
{
@ -4466,19 +4466,12 @@ void KvStoreFrontend::ScrubThread::operator()()
}
done_size += item.size;
if ( (!b || get_file != nullptr) && !has_changes)
if ( !b && !has_changes)
{
IScopedLock lock(new_md5sums_mutex);
has_changes = true;
}
}
if (!unused_tmp_file.empty())
{
Server->deleteFile(unused_tmp_file);
}
delete this;
}
void KvStoreFrontend::MirrorWorker::operator()()
@ -4639,10 +4632,18 @@ void KvStoreFrontend::MirrorWorker::operator()()
continue;
}
std::unique_ptr<IFsFile> empty_file(frontend->cachefs->openFile(frontend->empty_file_path, MODE_READ));
if(!empty_file)
{
Server->Log("Cannot open empty file: "+frontend->cachefs->lastError(), LL_ERROR);
abort();
}
int64 size;
std::string md5sum;
if (frontend->backend_mirror->put(frontend->prefixKey(convert(trans.id) + suffix),
nullptr, frontend->empty_file_path, 0, false, md5sum, size))
empty_file.get(), 0, false, md5sum, size))
{
dao.setTransactionMirrored(trans.id);
--frontend->mirror_items;
@ -4721,12 +4722,11 @@ void KvStoreFrontend::MirrorWorker::operator()()
void KvStoreFrontend::MirrorThread::operator()()
{
std::string tmp_fn;
{
std::unique_ptr<IFile> f(Server->openTemporaryFile());
tmp_fn = f->getFilename();
}
ScopedDeleteFn delete_tmp_fn(tmp_fn);
IFsFile* tmpf = Server->openTemporaryFile();
ScopedDeleteFile del_tmpf(tmpf);
if(tmpf==nullptr)
abort();
std::string cdata;
while (!has_error
@ -4745,27 +4745,29 @@ void KvStoreFrontend::MirrorThread::operator()()
KvStoreDao::CdIterObject2& obj = objs[i];
std::string fn = frontend->prefixKey(frontend->encodeKey(obj.tkey, obj.trans_id));
IFsFile* ret_file = nullptr;
ScopedFreeObjRef<IFsFile*> tmp_f_del(ret_file);
std::string ret_md5sum;
unsigned int get_status;
Server->deleteFile(tmp_fn);
size_t retry_n = 0;
bool success = false;
while (!success
&& retry_n < 10)
{
if (backend->get(fn, "tmp://"+tmp_fn, obj.md5sum, IKvStoreBackend::GetPrependMd5sum, false, ret_file, ret_md5sum, get_status))
if(!tmpf->Resize(0) || !tmpf->Seek(0))
{
abort();
}
if (backend->get(fn, obj.md5sum, IKvStoreBackend::GetPrependMd5sum, false, tmpf, ret_md5sum, get_status))
{
size_t retry_n_put = 0;
while (!success
&& retry_n_put < 10)
{
ret_file->Seek(0);
tmpf->Seek(0);
int64 ret_size;
if (backend_mirror->put(fn, ret_file, "tmp://"+tmp_fn, IKvStoreBackend::PutAlreadyCompressedEncrypted, false, ret_md5sum, ret_size))
if (backend_mirror->put(fn, tmpf, IKvStoreBackend::PutAlreadyCompressedEncrypted, false, ret_md5sum, ret_size))
{
frontend->mirror_curr_pos += obj.size;
success = true;

View File

@ -50,13 +50,13 @@ public:
bool background_worker_manual_run, bool background_worker_multi_trans_delete, IBackupFileSystem* cachefs);
~KvStoreFrontend();
virtual IFsFile* get(const std::string& key, int64 transid, const std::string& path,
virtual IFsFile* get(const std::string& key, int64 transid,
bool prioritize_read, IFsFile* tmpl_file, bool allow_error_event, bool& not_found,
int64* get_transid = nullptr) {
return get(0, key, transid, path, prioritize_read, tmpl_file, allow_error_event, not_found, get_transid);
return get(0, key, transid, prioritize_read, tmpl_file, allow_error_event, not_found, get_transid);
}
IFsFile* get(int64 cd_id, const std::string& key, int64 transid, const std::string& path,
IFsFile* get(int64 cd_id, const std::string& key, int64 transid,
bool prioritize_read, IFsFile* tmpl_file, bool allow_error_event, bool& not_found,
int64* get_transid=nullptr);
@ -69,15 +69,15 @@ public:
virtual bool reset(const std::string& key, int64 transid);
virtual bool put(const std::string& key, int64 transid, IFsFile* src,
const std::string& path, unsigned int flags,
unsigned int flags,
bool allow_error_event, int64& compressed_size)
{
return put(0, key, transid, 0, src, path, flags,
return put(0, key, transid, 0, src, flags,
allow_error_event, compressed_size);
}
bool put(int64 cd_id, const std::string& key, int64 transid, int64 generation, IFsFile* src,
const std::string& path, unsigned int flags,
unsigned int flags,
bool allow_error_event, int64& compressed_size);
virtual int64 new_transaction(bool allow_error_event)

View File

@ -463,7 +463,24 @@ public:
&& kv_store.num_second_chances_cb->is_metadata(item->key))
flags |= IOnlineKvStore::PutMetadata;
while(!kv_store.online_kv_store->put(item->key, item->transid, memf_file, path, flags, n>retry_log_n, compressed_size) )
IFsFile* putf = memf_file;
while(putf==nullptr)
{
putf = kv_store.cachefs->openFile(path, MODE_READ);
if(putf==nullptr)
{
std::string syserr = kv_store.cachefs->lastError();
Server->Log("Cannot open file to submit: " + path+". "+ syserr, LL_ERROR);
addSystemEvent("online_kv_store",
"Cannot open file to submit",
"Cannot open file to submit: " + path + ". " + syserr, LL_ERROR);
retryWait(n++);
}
}
while(!kv_store.online_kv_store->put(item->key, item->transid, memf_file, flags, n>retry_log_n, compressed_size) )
{
Server->Log("Error submitting dirty data block "+kv_store.hex(item->key)+" transid "+convert(item->transid)+". Retrying...", LL_ERROR);
if (kv_store.online_kv_store->fast_write_retry())
@ -2201,9 +2218,57 @@ IFsFile* TransactionalKvStore::get_retrieve(const std::string& key, BitmapInfo b
do
{
not_found = false;
online_file = online_kv_store->get(key, transid, keypath2(key, transid),
prioritize_read, memfile, retry_n > retry_log_n, not_found);
assert(online_file == nullptr || memfile == nullptr || online_file == memfile);
IFsFile* online_file_tmpl = memfile;
if (online_file_tmpl == nullptr)
{
std::string online_file_path = keypath2(key, transid);
bool path_err = false;
if (!cachefs->directoryExists(ExtractFilePath(online_file_path)))
{
if (!cachefs->createDir(ExtractFilePath(online_file_path)))
{
std::string syserr = os_last_error_str();
Server->Log("Error creating directory for GET-file with key " + bytesToHex(key) + " path " + ExtractFilePath(online_file_path) + ". " + syserr, LL_ERROR);
addSystemEvent("cache_err_retry",
"Error creating directory for get file on cache",
"Error creating directory for GET-file with key " + bytesToHex(key) + " path " + ExtractFilePath(online_file_path) + ". " + syserr, LL_ERROR);
path_err = true;
}
}
if (!path_err)
{
online_file_tmpl = cachefs->openFile(online_file_path, CREATE_DIRECT);
if (online_file_tmpl == nullptr)
{
std::string syserr = os_last_error_str();
Server->Log("Error opening output file " + online_file_path + ". " + syserr, LL_ERROR);
addSystemEvent("online_kv_store",
"Error opening output file",
"Error opening output file " + online_file_path + ". " + syserr, LL_ERROR);
}
else
{
set_cache_file_compression(key, online_file_path);
}
}
}
if (online_file_tmpl != nullptr)
{
online_file = online_kv_store->get(key, transid,
prioritize_read, online_file_tmpl, retry_n > retry_log_n, not_found);
assert(online_file == nullptr || memfile == nullptr || online_file == memfile);
if (online_file == nullptr &&
online_file_tmpl != memfile)
{
Server->destroy(online_file_tmpl);
cachefs->deleteFile(keypath2(key, transid));
}
}
if (online_file != nullptr)
{
@ -2392,6 +2457,8 @@ IFsFile* TransactionalKvStore::get_retrieve(const std::string& key, BitmapInfo b
retryWait(++retry_n);
}
FILE_SIZE_CACHE(online_file->setCachedSize(0));
set_cache_file_compression(key, path);
}
} while (online_file == nullptr);
@ -3045,7 +3112,8 @@ TransactionalKvStore::TransactionalKvStore(IBackupFileSystem* cachefs, int64 min
bool verify_cache, float cpu_multiplier, size_t no_compress_mult, bool with_prev_link,
bool allow_evict, bool with_submitted_files, float resubmit_compressed_ratio, int64 max_memfile_size,
std::string memcache_path, float memory_usage_factor,
bool only_memfiles, unsigned int background_comp_method)
bool only_memfiles, unsigned int background_comp_method,
unsigned int cache_comp, unsigned int meta_cache_comp)
: min_cachesize(min_cachesize), min_free_size(min_free_size), critical_free_size(critical_free_size),
comp_percent(comp_percent), comp_start_limit(comp_start_limit), throttle_free_size(throttle_free_size),
do_stop(false),
@ -3074,7 +3142,8 @@ TransactionalKvStore::TransactionalKvStore(IBackupFileSystem* cachefs, int64 min
total_submitted_bytes(0),
retrieval_waiters_async(0),
retrieval_waiters_sync(0),
cachefs(cachefs)
cachefs(cachefs),
cache_comp(cache_comp), meta_cache_comp(meta_cache_comp)
{
g_cache_mutex = &cache_mutex;
@ -3958,8 +4027,22 @@ void TransactionalKvStore::read_keys(std::unique_lock<cache_mutex_t>& cache_lock
}
std::string tmpfn = "verify.file";
IFsFile* tmpl_file = cachefs->openFile(tmpfn, CREATE_DIRECT);
if (tmpl_file == nullptr)
{
Server->Log("Could not verify.file on cache fs. "+cachefs->lastError(), LL_ERROR);
assert(false);
throw std::runtime_error("Could not verify.file on cache fs. " + cachefs->lastError());
}
bool not_found = false;
IFile* online_file = online_kv_store->get(key, transid, tmpfn, false, nullptr, true, not_found);
IFile* online_file = online_kv_store->get(key, transid, false, tmpl_file, true, not_found);
if (online_file == nullptr)
{
Server->destroy(tmpl_file);
cachefs->deleteFile(tmpfn);
}
if (online_file == nullptr || not_found)
{
@ -5730,7 +5813,20 @@ bool TransactionalKvStore::read_from_dirty_file(std::unique_lock<cache_mutex_t>&
bool not_found;
int64 get_transid = 0;
IFsFile* f = online_kv_store->get(key, transaction_id, "submit.test.file", false, nullptr, true, not_found, &get_transid);
std::string tmpfn = "submit.test.file";
IFsFile* tmpl_file = cachefs->openFile(tmpfn, CREATE_DIRECT);
if (tmpl_file == nullptr)
{
Server->Log("Cannot open submit.test.file "+cachefs->lastError(), LL_ERROR);
addSystemEvent("cache_err_fatal",
"Cannot open file",
"Cannot open submit.test.file " + cachefs->lastError(), LL_ERROR);
abort();
}
IFsFile* f = online_kv_store->get(key, transaction_id, false, tmpl_file, true, not_found, &get_transid);
if (f == nullptr)
{
Server->Log("Cannot get file not_found="+convert(not_found), LL_ERROR);
@ -5742,7 +5838,7 @@ bool TransactionalKvStore::read_from_dirty_file(std::unique_lock<cache_mutex_t>&
else
{
Server->destroy(f);
cachefs->deleteFile("submit.test.file");
cachefs->deleteFile(tmpfn);
Server->Log("Get ok", LL_WARNING);
}
@ -9035,4 +9131,20 @@ void TransactionalKvStore::MemfdDelThread::operator()()
lock.lock();
}
}
bool TransactionalKvStore::set_cache_file_compression(const std::string& key, const std::string& fpath)
{
if (meta_cache_comp != CompressionNone &&
num_second_chances_cb != nullptr &&
num_second_chances_cb->is_metadata(key))
{
return cachefs->setXAttr(fpath, "btrfs.compression", CompressionMethodToBtrfsString(meta_cache_comp));
}
else if (cache_comp != CompressionNone)
{
return cachefs->setXAttr(fpath, "btrfs.compression", CompressionMethodToBtrfsString(cache_comp));
}
return true;
}

View File

@ -8,12 +8,12 @@
#include "../Interface/BackupFileSystem.h"
#include <set>
#include "../Interface/Condition.h"
#include <memory>
#include <deque>
#include <queue>
#include <mutex>
#include <thread>
#include <condition_variable>
#include <memory>
#include <deque>
#include <queue>
#include <mutex>
#include <thread>
#include <condition_variable>
#include "IOnlineKvStore.h"
#include "../urbackupcommon/os_functions.h"
#ifdef HAS_ASYNC
@ -206,7 +206,8 @@ public:
IOnlineKvStore* online_kv_store, const std::string& encryption_key, ICompressEncryptFactory* compress_encrypt_factory, bool verify_cache,
float cpu_multiplier, size_t no_compress_mult, bool with_prev_link, bool allow_evict, bool with_submitted_files,
float resubmit_compressed_ratio, int64 max_memfile_size, std::string memcache_path,
float memory_usage_factor, bool only_memfiles, unsigned int background_comp_method);
float memory_usage_factor, bool only_memfiles, unsigned int background_comp_method,
unsigned int cache_comp, unsigned int meta_cache_comp);
~TransactionalKvStore();
@ -413,8 +414,8 @@ private:
#endif
void wait_for_del_file(const std::string& fn);
bool read_dirty_items(std::unique_lock<cache_mutex_t>& cache_lock, int64 transaction_id, int64 attibute_to_trans_id);
bool read_dirty_items(std::unique_lock<cache_mutex_t>& cache_lock, int64 transaction_id, int64 attibute_to_trans_id);
bool read_submitted_evicted_files(const std::string& fn, std::set<std::string>& sub_evict);
bool read_from_dirty_file(std::unique_lock<cache_mutex_t>& cache_lock, const std::string& fn, int64 transaction_id, bool do_submit, int64 nosubmit_transid);
@ -528,6 +529,8 @@ private:
TransactionalKvStore::SCacheVal cache_val(const std::string& key, bool dirty);
TransactionalKvStore::SCacheVal cache_val_nc(bool dirty);
bool set_cache_file_compression(const std::string& key, const std::string& fpath);
std::list<SSubmissionItem>::iterator submission_queue_add(SSubmissionItem& item, bool memfile);
std::list<SSubmissionItem>::iterator submission_queue_insert(SSubmissionItem& item, bool memfile, std::list<SSubmissionItem>::iterator it);
@ -800,4 +803,7 @@ private:
std::string memcache_path;
std::unique_ptr<IFsFile> cache_lock_file;
unsigned int cache_comp;
unsigned int meta_cache_comp;
};