Add cloud drive

This commit is contained in:
Martin Raiber 2021-05-08 20:52:31 +02:00
parent 82fbab47ff
commit a5e55bb61e
32 changed files with 30782 additions and 1 deletions

2
.gitignore vendored
View File

@ -224,7 +224,6 @@ access_backups_shell_mui.txt
sha2.Plo
*.Plo
osx-pkg
clouddrive/*
transaction_helper/*
urbackup/new2_db/*
urbackup/server_ident_ecdsa409k1.priv
@ -274,3 +273,4 @@ blockalign/x64/*
/btrfs/fuse/x64/*
/luaplugin/Release/*
/urbackup/data_*
/clouddrive/x64/*

View File

@ -39,6 +39,8 @@ Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "btrfsplugin", "btrfs\btrfsp
{EA09B289-492C-4F30-813B-9928994FB21B} = {EA09B289-492C-4F30-813B-9928994FB21B}
EndProjectSection
EndProject
Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "clouddrive", "clouddrive\clouddrive.vcxproj", "{3B050C2B-BE6E-4837-A81C-B07ABF8F9E18}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|ARM = Debug|ARM
@ -345,6 +347,26 @@ Global
{8F901558-A663-4EA8-A52D-E6577ABEB2D1}.Release|Win32.Build.0 = Release|Win32
{8F901558-A663-4EA8-A52D-E6577ABEB2D1}.Release|x64.ActiveCfg = Release|x64
{8F901558-A663-4EA8-A52D-E6577ABEB2D1}.Release|x64.Build.0 = Release|x64
{3B050C2B-BE6E-4837-A81C-B07ABF8F9E18}.Debug|ARM.ActiveCfg = Debug|Win32
{3B050C2B-BE6E-4837-A81C-B07ABF8F9E18}.Debug|ARM64.ActiveCfg = Debug|Win32
{3B050C2B-BE6E-4837-A81C-B07ABF8F9E18}.Debug|Win32.ActiveCfg = Debug|Win32
{3B050C2B-BE6E-4837-A81C-B07ABF8F9E18}.Debug|Win32.Build.0 = Debug|Win32
{3B050C2B-BE6E-4837-A81C-B07ABF8F9E18}.Debug|x64.ActiveCfg = Debug|x64
{3B050C2B-BE6E-4837-A81C-B07ABF8F9E18}.Debug|x64.Build.0 = Debug|x64
{3B050C2B-BE6E-4837-A81C-B07ABF8F9E18}.Release Service|ARM.ActiveCfg = Debug|Win32
{3B050C2B-BE6E-4837-A81C-B07ABF8F9E18}.Release Service|ARM.Build.0 = Debug|Win32
{3B050C2B-BE6E-4837-A81C-B07ABF8F9E18}.Release Service|ARM64.ActiveCfg = Debug|Win32
{3B050C2B-BE6E-4837-A81C-B07ABF8F9E18}.Release Service|ARM64.Build.0 = Debug|Win32
{3B050C2B-BE6E-4837-A81C-B07ABF8F9E18}.Release Service|Win32.ActiveCfg = Release|Win32
{3B050C2B-BE6E-4837-A81C-B07ABF8F9E18}.Release Service|Win32.Build.0 = Release|Win32
{3B050C2B-BE6E-4837-A81C-B07ABF8F9E18}.Release Service|x64.ActiveCfg = Release|x64
{3B050C2B-BE6E-4837-A81C-B07ABF8F9E18}.Release Service|x64.Build.0 = Release|x64
{3B050C2B-BE6E-4837-A81C-B07ABF8F9E18}.Release|ARM.ActiveCfg = Release|Win32
{3B050C2B-BE6E-4837-A81C-B07ABF8F9E18}.Release|ARM64.ActiveCfg = Release|Win32
{3B050C2B-BE6E-4837-A81C-B07ABF8F9E18}.Release|Win32.ActiveCfg = Release|Win32
{3B050C2B-BE6E-4837-A81C-B07ABF8F9E18}.Release|Win32.Build.0 = Release|Win32
{3B050C2B-BE6E-4837-A81C-B07ABF8F9E18}.Release|x64.ActiveCfg = Release|x64
{3B050C2B-BE6E-4837-A81C-B07ABF8F9E18}.Release|x64.Build.0 = Release|x64
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE

23
clouddrive/Auto.h Normal file
View File

@ -0,0 +1,23 @@
#pragma once
template<class L>
class AtScopeExit {
L& ll;
public:
AtScopeExit(L& action) : ll(action) {}
~AtScopeExit() { ll(); }
};
#define AUTO_TOKEN_PASTEx(x, y) x ## y
#define AUTO_TOKEN_PASTE(x, y) AUTO_TOKEN_PASTEx(x, y)
#define Auto_INTERNAL1(lname, aname, ...) \
auto lname = [&]() { __VA_ARGS__; }; \
AtScopeExit<decltype(lname)> aname(lname);
#define Auto_INTERNAL2(ctr, ...) \
Auto_INTERNAL1(AUTO_TOKEN_PASTE(Auto_func_, ctr), \
AUTO_TOKEN_PASTE(Auto_instance_, ctr), __VA_ARGS__)
#define Auto(...) \
Auto_INTERNAL2(__COUNTER__, __VA_ARGS__)

View File

@ -0,0 +1,148 @@
/*************************************************************************
* UrBackup - Client/Server backup system
* Copyright (C) 2021 Martin Raiber
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
**************************************************************************/
#include "CdZlibCompressor.h"
CdZlibCompressor::CdZlibCompressor(int compression_level, unsigned int compression_id)
: compression_id(compression_id)
{
memset(&strm, 0, sizeof(z_stream));
if (deflateInit(&strm, compression_level) != Z_OK)
{
throw std::runtime_error("Error initializing compression stream");
}
}
CdZlibCompressor::~CdZlibCompressor()
{
deflateEnd(&strm);
}
void CdZlibCompressor::setOut(char * next_out, size_t avail_out)
{
strm.next_out = reinterpret_cast<Bytef*>(next_out);
strm.avail_out = avail_out;
}
void CdZlibCompressor::setIn(char * next_in, size_t avail_in)
{
strm.next_in = reinterpret_cast<Bytef*>(next_in);
strm.avail_in = avail_in;
}
size_t CdZlibCompressor::getAvailOut()
{
return strm.avail_out;
}
size_t CdZlibCompressor::getAvailIn()
{
return strm.avail_in;
}
CompressResult CdZlibCompressor::compress(bool finish, int & code)
{
code = deflate(&strm, finish ? Z_FINISH : Z_NO_FLUSH);
if (code == Z_OK)
{
return CompressResult_Ok;
}
else if (code == Z_STREAM_END)
{
return CompressResult_End;
}
else
{
if (code == Z_DATA_ERROR
&& strm.msg != NULL)
{
Server->Log(std::string("Zlib error: ") + strm.msg, LL_ERROR);
}
return CompressResult_Other;
}
}
unsigned int CdZlibCompressor::getId()
{
return compression_id;
}
CdZlibDecompressor::CdZlibDecompressor()
{
memset(&strm, 0, sizeof(z_stream));
if (inflateInit(&strm) != Z_OK)
{
throw std::runtime_error("Error initializing decompression stream");
}
}
CdZlibDecompressor::~CdZlibDecompressor()
{
inflateEnd(&strm);
}
void CdZlibDecompressor::setOut(char * next_out, size_t avail_out)
{
strm.next_out = reinterpret_cast<Bytef*>(next_out);
strm.avail_out = avail_out;
}
void CdZlibDecompressor::setIn(char * next_in, size_t avail_in)
{
strm.next_in = reinterpret_cast<Bytef*>(next_in);
strm.avail_in = avail_in;
}
size_t CdZlibDecompressor::getAvailOut()
{
return strm.avail_out;
}
size_t CdZlibDecompressor::getAvailIn()
{
return strm.avail_in;
}
DecompressResult CdZlibDecompressor::decompress(int & code)
{
code = inflate(&strm, Z_NO_FLUSH);
if (code == Z_OK)
{
return DecompressResult_Ok;
}
else if (code == Z_STREAM_END)
{
return DecompressResult_End;
}
else
{
if (code == Z_DATA_ERROR)
{
if (strm.msg != nullptr)
{
Server->Log(std::string("Error while decompressing Z_DATA_ERROR: ") + strm.msg, LL_ERROR);
}
}
return DecompressResult_Other;
}
}

View File

@ -0,0 +1,39 @@
#pragma once
#include <zlib.h>
#include "CompressEncrypt.h"
class CdZlibCompressor : public ICompressor
{
public:
CdZlibCompressor(int compression_level, unsigned int compression_id);
~CdZlibCompressor();
// Inherited via ICompressor
virtual void setOut(char * next_out, size_t avail_out);
virtual void setIn(char * next_in, size_t avail_in);
virtual size_t getAvailOut();
virtual size_t getAvailIn();
virtual CompressResult compress(bool finish, int& code);
virtual unsigned int getId();
private:
z_stream strm;
unsigned int compression_id;
};
class CdZlibDecompressor : public IDecompressor
{
public:
CdZlibDecompressor();
~CdZlibDecompressor();
// Inherited via ICompressor
virtual void setOut(char * next_out, size_t avail_out);
virtual void setIn(char * next_in, size_t avail_in);
virtual size_t getAvailOut();
virtual size_t getAvailIn();
virtual DecompressResult decompress(int& code);
private:
z_stream strm;
};

View File

@ -0,0 +1,160 @@
/*************************************************************************
* UrBackup - Client/Server backup system
* Copyright (C) 2021 Martin Raiber
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
**************************************************************************/
#include "CdZstdCompressor.h"
CdZstdCompressor::CdZstdCompressor(int compression_level, unsigned int compression_id)
: cctx(ZSTD_createCCtx()), compression_id(compression_id), inBuffer(), outBuffer()
{
if (cctx == nullptr)
{
abort();
}
size_t err = ZSTD_CCtx_setParameter(cctx, ZSTD_c_compressionLevel, compression_level);
if (ZSTD_isError(err))
{
Server->Log(std::string("Error setting zstd compression level. ") + ZSTD_getErrorName(err), LL_ERROR);
}
err = ZSTD_CCtx_setParameter(cctx, ZSTD_c_checksumFlag, 0);
if (ZSTD_isError(err))
{
Server->Log(std::string("Error setting zstd compression flag. ") + ZSTD_getErrorName(err), LL_ERROR);
}
}
CdZstdCompressor::~CdZstdCompressor()
{
ZSTD_freeCCtx(cctx);
}
void CdZstdCompressor::setOut(char * next_out, size_t avail_out)
{
outBuffer.dst = next_out;
outBuffer.pos = 0;
outBuffer.size= avail_out;
}
void CdZstdCompressor::setIn(char * next_in, size_t avail_in)
{
inBuffer.src = next_in;
inBuffer.size = avail_in;
inBuffer.pos = 0;
}
size_t CdZstdCompressor::getAvailOut()
{
return outBuffer.size - outBuffer.pos;
}
size_t CdZstdCompressor::getAvailIn()
{
return inBuffer.size - inBuffer.pos;
}
CompressResult CdZstdCompressor::compress(bool finish, int & code)
{
size_t remaining;
remaining = ZSTD_compressStream2(cctx, &outBuffer, &inBuffer, finish ? ZSTD_e_end : ZSTD_e_continue);
if (finish
&& remaining == 0)
{
return CompressResult_End;
}
else if (ZSTD_isError(remaining))
{
Server->Log(std::string("Error compressing zstd: ") + ZSTD_getErrorName(remaining), LL_ERROR);
code = static_cast<int>(remaining);
return CompressResult_Other;
}
else
{
return CompressResult_Ok;
}
}
unsigned int CdZstdCompressor::getId()
{
return compression_id;
}
CdZstdDecompressor::CdZstdDecompressor()
: dstream(ZSTD_createDStream())
{
if (dstream == nullptr)
abort();
size_t rc =ZSTD_initDStream(dstream);
if (ZSTD_isError(rc))
{
Server->Log(std::string("Error initializing zstd dstream ") + ZSTD_getErrorName(rc), LL_ERROR);
abort();
}
}
CdZstdDecompressor::~CdZstdDecompressor()
{
ZSTD_freeDStream(dstream);
}
void CdZstdDecompressor::setOut(char * next_out, size_t avail_out)
{
outBuffer.dst = next_out;
outBuffer.size = avail_out;
outBuffer.pos = 0;
}
void CdZstdDecompressor::setIn(char * next_in, size_t avail_in)
{
inBuffer.src = next_in;
inBuffer.size = avail_in;
inBuffer.pos = 0;
}
size_t CdZstdDecompressor::getAvailOut()
{
return outBuffer.size - outBuffer.pos;
}
size_t CdZstdDecompressor::getAvailIn()
{
return inBuffer.size - inBuffer.pos;
}
DecompressResult CdZstdDecompressor::decompress(int & code)
{
size_t rc = ZSTD_decompressStream(dstream, &outBuffer, &inBuffer);
if (ZSTD_isError(rc))
{
Server->Log(std::string("Error decompressing zstd: ") + ZSTD_getErrorName(rc), LL_ERROR);
code = static_cast<int>(rc);
return DecompressResult_Other;
}
if (rc == 0)
{
return DecompressResult_End;
}
return DecompressResult_Ok;
}

View File

@ -0,0 +1,46 @@
#pragma once
#define ZSTD_STATIC_LINKING_ONLY
#include <zstd.h>
#include "CompressEncrypt.h"
class CdZstdCompressor : public ICompressor
{
public:
CdZstdCompressor(int compression_level, unsigned int compression_id);
~CdZstdCompressor();
// Inherited via ICompressor
virtual void setOut(char * next_out, size_t avail_out);
virtual void setIn(char * next_in, size_t avail_in);
virtual size_t getAvailOut();
virtual size_t getAvailIn();
virtual CompressResult compress(bool finish, int& code);
virtual unsigned int getId();
private:
ZSTD_CCtx* cctx;
unsigned int compression_id;
ZSTD_inBuffer inBuffer;
ZSTD_outBuffer outBuffer;
};
class CdZstdDecompressor : public IDecompressor
{
public:
CdZstdDecompressor();
~CdZstdDecompressor();
// Inherited via ICompressor
virtual void setOut(char * next_out, size_t avail_out);
virtual void setIn(char * next_in, size_t avail_in);
virtual size_t getAvailOut();
virtual size_t getAvailIn();
virtual DecompressResult decompress(int& code);
private:
ZSTD_DStream* dstream;
ZSTD_inBuffer inBuffer;
ZSTD_outBuffer outBuffer;
};

7977
clouddrive/CloudFile.cpp Normal file

File diff suppressed because it is too large Load Diff

787
clouddrive/CloudFile.h Normal file
View File

@ -0,0 +1,787 @@
#pragma once
#include "../Interface/File.h"
#include "../Interface/Thread.h"
#include "IOnlineKvStore.h"
#include "../Interface/Mutex.h"
#include "../Interface/SharedMutex.h"
#include "../Interface/ThreadPool.h"
#include "ICompressEncrypt.h"
#include "../common/data.h"
#include "TransactionalKvStore.h"
#ifdef HAS_MOUNT_SERVICE
#include "btrfs_chunks.h"
#endif
#include "KvStoreFrontend.h"
#ifdef HAS_ASYNC
#include "fuse_io_context.h"
#endif
#include <atomic>
#include <queue>
class IBackupFileSystem;
namespace
{
class FileBitmap;
class SparseFileBitmap;
class UpdateMissingChunksThread;
class ShareWithUpdater;
}
void task_set_less_throttle();
void task_unset_less_throttle();
bool flush_dev(std::string dev_fn);
void setMountStatus(const std::string& data);
void setMountStatusErr(const std::string& err);
class CloudFile : public IFile, public IThread, public TransactionalKvStore::INumSecondChancesCallback, IOnlineKvStore::IHasKeyCallback
{
public:
CloudFile(const std::string& cache_path,
IBackupFileSystem* cachefs,
int64 cloudfile_size,
int64 max_cloudfile_size,
IOnlineKvStore* online_kv_store,
const std::string& encryption_key,
ICompressEncryptFactory* compress_encrypt_factory,
bool verify_cache, float cpu_multiplier,
bool background_compress,
size_t no_compress_mult,
int64 reserved_cache_device_space,
int64 min_metadata_cache_free,
bool with_prev_link,
bool allow_evict,
bool with_submitted_files,
float resubmit_compressed_ratio,
int64 max_memfile_size,
std::string memcache_path,
float memory_usage_factor,
bool only_memfiles,
std::string mount_path,
std::string share_with_mount_paths,
unsigned int background_comp_method,
const std::string& slog_path,
int64 slog_max_size,
bool is_async);
~CloudFile();
CloudFile(CloudFile const&) = delete;
CloudFile(CloudFile&& other) = delete;
CloudFile& operator=(CloudFile&&) = delete;
CloudFile& operator=(CloudFile const&) = delete;
virtual std::string Read( _u32 tr, bool* has_error=NULL);
virtual std::string Read( int64 pos, _u32 tr, bool* has_error = NULL);
_u32 Read(int64 pos, char* buffer, _u32 bsize, bool* has_error = NULL);
virtual _u32 Read( char* buffer, _u32 bsize, bool* has_error = NULL);
virtual _u32 Write( const std::string &tw, bool* has_error = NULL);
virtual _u32 Write(int64 pos, const std::string &tw, bool* has_error = NULL);
virtual _u32 Write( const char* buffer, _u32 bsize, bool* has_error = NULL);
virtual _u32 Write( int64 pos, const char* buffer, _u32 bsize, bool* has_error = NULL);
_u32 WriteNonBlocking( int64 pos, const char* buffer, _u32 bsize, bool* has_error = NULL);
#ifdef HAS_ASYNC
fuse_io_context::io_uring_task<int> ReadAsync(fuse_io_context& io, fuse_io_context::FuseIo& fuse_io,
int64 pos, _u32 bsize, unsigned int flags, bool ext_lock);
fuse_io_context::io_uring_task<int> WriteAsync(fuse_io_context& io, fuse_io_context::FuseIo& fuse_io,
int64 pos, _u32 bsize, bool new_block, bool ext_lock, bool can_block);
#endif
virtual bool Seek( _i64 spos );
virtual _i64 Size( void );
#ifdef HAS_ASYNC
virtual fuse_io_context::io_uring_task<_i64> SizeAsync(void);
#endif
virtual _i64 RealSize(void);
virtual bool Resize(int64 nsize);
#ifdef HAS_ASYNC
fuse_io_context::io_uring_task<bool> ResizeAsync(fuse_io_context& io, int64 nsize);
#endif
virtual bool PunchHole( _i64 spos, _i64 size );
#ifdef HAS_ASYNC
fuse_io_context::io_uring_task<int> PunchHoleAsync(fuse_io_context& io, _i64 spos, _i64 size);
#endif
virtual std::string getFilename( void );
virtual bool Sync();
bool Flush();
bool Flush(bool do_submit, bool for_slog=false);
#ifdef HAS_ASYNC
fuse_io_context::io_uring_task<bool> FlushAsync(fuse_io_context& io, bool do_submit);
#endif
bool close_bitmaps();
void Reset();
int64 getDirtyBytes();
int64 getSubmittedBytes();
int64 getTotalSubmittedBytes();
int64 getUsedBytes();
std::string getNumDirtyItems();
std::string getNumMemfileItems();
int64 getCacheSize();
std::string getStats();
int64 getCompBytes();
void setDevNames(std::string bdev, std::string ldev);
static std::string block_key(int64 blocknum);
static int64 block_key_rev(const std::string& data);
static std::string big_block_key(int64 blocknum);
static std::string small_block_key(int64 blocknum);
static std::string hex_big_block_key(int64 bytenum);
static std::string hex_small_block_key(int64 bytenum);
int Congested();
int CongestedAsync();
std::string get_raid_groups();
std::string scrub_stats();
void start_scrub(ScrubAction action);
void stop_scrub();
bool add_disk(const std::string& path);
bool remove_disk(const std::string& path, bool completely);
std::string disk_error_info();
int64 current_total_size();
int64 current_free_space();
bool set_target_failure_probability(double t);
bool set_target_overhead(double t);
std::string get_scrub_position();
int64 get_memfile_bytes();
int64 get_submitted_memfile_bytes();
virtual void operator()();
#ifdef HAS_ASYNC
fuse_io_context::io_uring_task_discard<int> run_async(fuse_io_context& io);
#endif
void run_cd_fracture();
bool start_raid_defrag(const std::string& settings);
bool is_background_worker_enabled();
bool is_background_worker_running();
void enable_background_worker(bool b);
void preload(int64 start, int64 stop, size_t n_threads);
#ifdef HAS_ASYNC
fuse_io_context::io_uring_task<int> preload_async(fuse_io_context& io, int64 start, int64 stop, size_t n_threads);
#endif
void cmd(const std::string& c);
#ifdef HAS_ASYNC
fuse_io_context::io_uring_task<int> cmd_async(fuse_io_context& io, const std::string& c);
#endif
static int64 key_offset_hex(const std::string& key);
static int64 key_offset(const std::string& key, bool& big_block);
static int64 key_offset(const std::string& key);
std::string meminfo();
void shrink_mem();
#ifdef HAS_ASYNC
fuse_io_context::io_uring_task<int> shrink_mem_async(fuse_io_context& io);
#endif
int64 get_total_hits();
int64 get_total_hits_async();
int64 get_total_memory_hits();
int64 get_total_memory_hits_async();
int64 get_total_cache_miss_backend();
int64 get_total_cache_miss_decompress();
int64 get_total_dirty_ops();
int64 get_total_balance_ops();
int64 get_total_del_ops();
int64 get_total_put_ops();
int64 get_total_compress_ops();
bool has_preload_item(int64 offset_start, int64 offset_end);
#ifdef HAS_ASYNC
fuse_io_context::io_uring_task<bool> has_preload_item_async(fuse_io_context& io, int64 offset_start, int64 offset_end);
#endif
int64 min_block_size();
void set_is_mounted(const std::string& p_mount_path);
bool is_metadata(int64 offset, const std::string& key);
#ifdef HAS_ASYNC
fuse_io_context::io_uring_task<bool> is_metadata_async(fuse_io_context& io, int64 offset, const std::string& key);
#endif
virtual unsigned int get_num_second_chances(const std::string& key);
virtual bool is_metadata(const std::string& key);
bool update_missing_fs_chunks();
bool migrate(std::vector<char>& buf, int64 offset, int64 len);
bool migrate(const std::string& conf_fn, bool continue_migration);
std::string migration_info();
void preload_key(const std::string& key, int64 offset, int64 len, int tag, bool disable_memfiles, bool load_only);
#ifdef HAS_ASYNC
fuse_io_context::io_uring_task<int> preload_key_async(fuse_io_context& io, const std::string& key, int64 offset, int64 len, int tag, bool disable_memfiles, bool load_only);
#endif
int64 get_uploaded_bytes();
int64 get_downloaded_bytes();
int64 get_written_bytes();
int64 get_read_bytes();
std::string get_raid_io_stats();
bool replay_slog();
std::string get_mirror_stats();
std::string get_raid_freespace_stats();
virtual bool hasKey(const std::string& key) override;
private:
void preload_items(const std::string& fn, size_t n_threads, int tag, bool disable_memfiles, bool load_only);
#ifdef HAS_ASYNC
fuse_io_context::io_uring_task<int> preload_items_async(fuse_io_context& io, const std::string& fn,
size_t n_threads, int tag, bool disable_memfiles, bool load_only);
#endif
_u32 WriteInt(int64 pos, const char* buffer, _u32 bsize, bool new_block, IScopedLock* ext_lock, bool can_block, bool* has_error);
_u32 ReadInt(int64 pos, char* buffer, _u32 bsize, IScopedLock* ext_lock, unsigned int flags, bool* has_error);
_u32 ReadAligned(IFile* block, int64 pos, char* buffer, _u32 toread, bool* has_read_error);
_u32 WriteAligned(IFile* block, int64 pos, const char* buffer, _u32 towrite);
void perform_deletes();
void open_bitmaps();
bool bitmap_has_big_block(int64 blocknum);
bool bitmap_has_small_block(int64 blocknum);
#ifdef HAS_ASYNC
fuse_io_context::io_uring_task<bool> bitmap_has_big_block_async(fuse_io_context& io, int64 blocknum);
fuse_io_context::io_uring_task<bool> bitmap_has_small_block_async(fuse_io_context& io, int64 blocknum);
#endif
void lock_extent(IScopedLock& lock, int64 start, int64 length, bool exclusive);
void unlock_extent(IScopedLock& lock, int64 start, int64 length, bool exclusive);
#ifdef HAS_ASYNC
fuse_io_context::io_uring_task<size_t> lock_extent_async(int64 start, int64 length, bool exclusive);
#endif
void unlock_extent_async(int64 start, int64 length, bool exclusive, size_t lock_idx);
void add_fracture_big_block(int64 b);
void update_fs_chunks(bool update_time, IScopedWriteLock& lock);
#ifdef HAS_ASYNC
fuse_io_context::io_uring_task<void> update_fs_chunks_async(fuse_io_context& io, bool update_time);
#endif
bool migrate(const std::string& conf_fn, ISettingsReader* settings);
bool update_migration_settings();
IOnlineKvStore* create_migration_endpoint(const std::string& conf_fn, ISettingsReader* settings,
IBackupFileSystem* migrate_cachefs);
bool slog_write(int64 pos, const char* buffer, _u32 bsize, bool& needs_reset);
bool slog_open();
void purge_jemalloc();
void set_jemalloc_dirty_decay(int64 timems);
/*_u32 prepare_zero_n_sqes(_u32 towrite);
std::vector<io_uring_sqe*> prepare_zeros(fuse_io_context& io, fuse_io_context::FuseIo& fuse_io, _u32 towrite, _u32 peek_add);*/
#ifdef HAS_ASYNC
fuse_io_context::io_uring_task<int> empty_pipe(fuse_io_context& io, fuse_io_context::FuseIo& fuse_io);
fuse_io_context::io_uring_task<bool> verify_pipe_empty(fuse_io_context& io, fuse_io_context::FuseIo& fuse_io);
struct AwaiterCoList
{
AwaiterCoList* next;
std::coroutine_handle<> awaiter;
};
fuse_io_context::io_uring_task_discard<int> preload_async_read(fuse_io_context& io, std::vector<std::unique_ptr<fuse_io_context::FuseIo> >& ios, AwaiterCoList*& ios_waiters_head, int64 start, _u32 size);
fuse_io_context::io_uring_task_discard<int> preload_items_single_async(fuse_io_context& io, size_t& available_workers, AwaiterCoList*& worker_waiters_head,
std::string key, int64 offset, _u32 len, int tag, bool disable_memfiles, bool load_only);
struct ReadTask
{
ReadTask(int fd, IFsFile* file, std::string key, uint64_t off, _u32 size)
: fd(fd), file(file), key(std::move(key)), off(off), size(size), flush_mem(false) {}
int fd;
IFsFile* file;
std::string key;
int64 off;
_u32 size;
bool flush_mem;
};
fuse_io_context::io_uring_task<int> complete_read_tasks(fuse_io_context& io, fuse_io_context::FuseIo& fuse_io,
const std::vector<ReadTask>& read_tasks, const size_t flush_mem_n_tasks, const int64 orig_pos, const _u32 bsize,
const unsigned int flags);
#endif
struct SExtent
{
int64 start;
int64 length;
ICondition* cond;
int refcount;
bool alive;
};
std::vector<SExtent> locked_extents;
#ifdef HAS_ASYNC
struct SExtentAsync
{
int64 start;
int64 length;
bool exclusive;
AwaiterCoList* awaiters;
int refcount;
};
std::vector<SExtentAsync> async_locked_extents;
#endif
size_t locked_extents_max_alive;
size_t wait_for_exclusive;
std::unique_ptr<IOnlineKvStore> online_kv_store;
TransactionalKvStore kv_store;
IFsFile* big_blocks_bitmap_file;
IFsFile* bitmap_file;
IFsFile* new_big_blocks_bitmap_file;
IFsFile* old_big_blocks_bitmap_file;
std::unique_ptr<FileBitmap> old_big_blocks_bitmap;
std::unique_ptr<FileBitmap> new_big_blocks_bitmap;
std::unique_ptr<SparseFileBitmap> big_blocks_bitmap;
std::unique_ptr<SparseFileBitmap> bitmap;
int64 cf_pos;
int64 active_big_block;
int64 used_bytes;
int64 last_stat_update_time;
int64 last_flush_check;
int64 cloudfile_size;
std::unique_ptr<IMutex> mutex;
std::string bdev_name;
std::string ldev_name;
size_t writeback_count;
std::string bcache_writeback_percent;
unsigned int io_alignment;
std::map<int64, int64> fracture_big_blogs;
std::unique_ptr<ICondition> thread_cond;
bool exit_thread;
THREADPOOL_TICKET fracture_big_blogs_ticket;
float memory_usage_factor;
int64 bitmaps_file_size;
size_t flush_enabled;
std::string cache_path;
std::string mount_path;
std::unique_ptr<ISharedMutex> chunks_mutex;
#ifdef HAS_MOUNT_SERVICE
std::vector<btrfs_chunks::SChunk> fs_chunks;
#endif
int64 last_fs_chunks_update;
std::vector<std::pair<std::string, int64> > missing_chunk_keys;
bool updating_fs_chunks;
std::unique_ptr<UpdateMissingChunksThread> update_missing_chunks_thread;
THREADPOOL_TICKET update_missing_chunks_thread_ticket;
std::unique_ptr<CloudFile> migrate_to_cf;
std::unique_ptr<IThreadPool> migration_thread_pool;
std::atomic<bool> migration_has_error;
std::atomic<int64> migration_copy_max;
std::atomic<int64> migration_copy_done;
int64 migration_copy_done_lag;
std::string migration_conf_fn;
THREADPOOL_TICKET migration_ticket;
std::set<std::string> in_write_retrieval;
std::unique_ptr<ICondition> in_write_retrieval_cond;
THREADPOOL_TICKET share_with_updater_ticket;
std::unique_ptr<ShareWithUpdater> share_with_updater;
int64 written_bytes;
int64 read_bytes;
std::atomic<int64> slog_size;
std::atomic<int64> slog_last_sync;
std::unique_ptr<IFsFile> slog;
std::string slog_path;
int64 slog_max_size;
bool enable_raid_freespace_stats;
bool is_flushing;
int zero_memfd;
_u32 zero_memfd_size;
#ifdef HAS_ASYNC
AwaiterCoList* exclusive_awaiter_head = nullptr;
struct ExclusiveAwaiter
{
ExclusiveAwaiter(CloudFile& cd)
: cd(cd) {}
ExclusiveAwaiter(ExclusiveAwaiter const&) = delete;
ExclusiveAwaiter(ExclusiveAwaiter&& other) = delete;
ExclusiveAwaiter& operator=(ExclusiveAwaiter&&) = delete;
ExclusiveAwaiter& operator=(ExclusiveAwaiter const&) = delete;
bool await_ready() const noexcept
{
return false;
}
void await_suspend(std::coroutine_handle<> p_awaiter) noexcept
{
awaiter.awaiter = p_awaiter;
awaiter.next = cd.exclusive_awaiter_head;
cd.exclusive_awaiter_head = &awaiter;
}
void await_resume() const noexcept
{
}
private:
AwaiterCoList awaiter;
CloudFile& cd;
};
void resume_awaiters(AwaiterCoList*& head)
{
AwaiterCoList* curr = head;
head = nullptr;
while (curr != nullptr)
{
AwaiterCoList* next = curr->next;
curr->awaiter.resume();
curr = next;
}
}
void resume_exclusive_awaiters()
{
resume_awaiters(exclusive_awaiter_head);
}
AwaiterCoList* write_retrieval_head = nullptr;
struct WriteRetrievalAwaiter
{
WriteRetrievalAwaiter(CloudFile& cd)
: cd(cd) {}
WriteRetrievalAwaiter(WriteRetrievalAwaiter const&) = delete;
WriteRetrievalAwaiter(WriteRetrievalAwaiter&& other) = delete;
WriteRetrievalAwaiter& operator=(WriteRetrievalAwaiter&&) = delete;
WriteRetrievalAwaiter& operator=(WriteRetrievalAwaiter const&) = delete;
bool await_ready() const noexcept
{
return false;
}
void await_suspend(std::coroutine_handle<> p_awaiter) noexcept
{
awaiter.awaiter = p_awaiter;
awaiter.next = cd.write_retrieval_head;
cd.write_retrieval_head = &awaiter;
}
void await_resume() const noexcept
{
}
private:
AwaiterCoList awaiter;
CloudFile& cd;
};
void resume_write_retrieval_awaiters()
{
resume_awaiters(write_retrieval_head);
}
struct ExtentAwaiter
{
ExtentAwaiter(SExtentAsync& extent)
: extent(extent) {}
ExtentAwaiter(ExtentAwaiter const&) = delete;
ExtentAwaiter(ExtentAwaiter&& other) = delete;
ExtentAwaiter& operator=(ExtentAwaiter&&) = delete;
ExtentAwaiter& operator=(ExtentAwaiter const&) = delete;
bool await_ready() const noexcept
{
return false;
}
void await_suspend(std::coroutine_handle<> p_awaiter) noexcept
{
awaiter.awaiter = p_awaiter;
awaiter.next = extent.awaiters;
extent.awaiters = &awaiter;
}
void await_resume() const noexcept
{
}
private:
AwaiterCoList awaiter;
SExtentAsync& extent;
};
struct IosAwaiter
{
IosAwaiter(std::vector<std::unique_ptr<fuse_io_context::FuseIo> >& ios, AwaiterCoList*& ios_awaiter_head)
: ios(ios), ios_awaiter_head(ios_awaiter_head) {}
IosAwaiter(IosAwaiter const&) = delete;
IosAwaiter(IosAwaiter&& other) = delete;
IosAwaiter& operator=(IosAwaiter&&) = delete;
IosAwaiter& operator=(IosAwaiter const&) = delete;
bool await_ready() const noexcept
{
return !ios.empty();
}
void await_suspend(std::coroutine_handle<> p_awaiter) noexcept
{
awaiter.awaiter = p_awaiter;
awaiter.next = ios_awaiter_head;
ios_awaiter_head = &awaiter;
}
std::unique_ptr<fuse_io_context::FuseIo> await_resume() noexcept
{
std::unique_ptr<fuse_io_context::FuseIo> ret = std::move(ios.back());
ios.pop_back();
return std::move(ret);
}
private:
AwaiterCoList awaiter;
std::vector<std::unique_ptr<fuse_io_context::FuseIo> >& ios;
AwaiterCoList*& ios_awaiter_head;
};
void iosWaitersResume(std::vector<std::unique_ptr<fuse_io_context::FuseIo> >& ios,
AwaiterCoList*& ios_waiters_head)
{
AwaiterCoList* curr = ios_waiters_head;
ios_waiters_head = nullptr;
while (curr != nullptr
&& !ios.empty())
{
AwaiterCoList* next = curr->next;
curr->awaiter.resume();
curr = next;
}
if (curr != nullptr)
{
curr->next = ios_waiters_head;
ios_waiters_head = curr;
}
}
struct WorkerAwaiter
{
WorkerAwaiter(size_t& available_workers, AwaiterCoList*& worker_awaiter_head)
: available_workers(available_workers), worker_awaiter_head(worker_awaiter_head) {}
WorkerAwaiter(WorkerAwaiter const&) = delete;
WorkerAwaiter(WorkerAwaiter&& other) = delete;
WorkerAwaiter& operator=(WorkerAwaiter&&) = delete;
WorkerAwaiter& operator=(WorkerAwaiter const&) = delete;
bool await_ready() const noexcept
{
return available_workers > 0;
}
void await_suspend(std::coroutine_handle<> p_awaiter) noexcept
{
awaiter.awaiter = p_awaiter;
awaiter.next = worker_awaiter_head;
worker_awaiter_head = &awaiter;
}
void await_resume() noexcept
{
assert(available_workers > 0);
--available_workers;
}
private:
AwaiterCoList awaiter;
size_t& available_workers;
AwaiterCoList*& worker_awaiter_head;
};
void workerWaitersResume(size_t& available_workers,
AwaiterCoList*& ios_waiters_head)
{
AwaiterCoList* curr = ios_waiters_head;
ios_waiters_head = nullptr;
while (curr != nullptr
&& available_workers>0)
{
AwaiterCoList* next = curr->next;
curr->awaiter.resume();
curr = next;
}
if (curr != nullptr)
{
curr->next = ios_waiters_head;
ios_waiters_head = curr;
}
}
#endif //HAS_ASYNC
std::unique_ptr<IFsFile> null_file;
std::unique_ptr<IFsFile> zero_file;
bool is_async;
#ifdef HAS_ASYNC
fuse_io_context::io_uring_task_discard<int> run_async_queue(fuse_io_context& io);
bool get_async_msg(CWData& msg, CRData& resp);
#endif
int queue_in_eventfd;
int update_fs_chunks_eventfd;
std::unique_ptr<IMutex> msg_queue_mutex;
std::unique_ptr<ICondition> msg_queue_cond;
int64 msg_queue_id;
std::queue<std::vector<char> > msg_queue;
std::map<size_t, std::vector<char> > msg_queue_responses;
std::set<int64> consider_big_blocks;
std::set<int64> curr_consider_big_blocks;
bool fallocate_block_file = true;
IBackupFileSystem* cachefs;
};

View File

@ -0,0 +1,68 @@
/*************************************************************************
* UrBackup - Client/Server backup system
* Copyright (C) 2021 Martin Raiber
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
**************************************************************************/
#include "ClouddriveFactory.h"
#include "CloudFile.h"
#include "KvStoreFrontend.h"
#include "KvStoreBackendS3.h"
bool ClouddriveFactory::checkConnectivity(CloudSettings settings, int64 timeoutms)
{
return false;
}
IFile* ClouddriveFactory::createCloudFile(IBackupFileSystem* cachefs, CloudSettings settings)
{
return createCloudFile(cachefs, settings, false);
}
IFile* ClouddriveFactory::createCloudFile(IBackupFileSystem* cachefs, CloudSettings settings, bool check_only)
{
IOnlineKvStore* online_kv_store;
if (settings.endpoint == CloudEndpoint::S3)
{
IKvStoreBackend* s3_backend = new KvStoreBackendS3(settings.encryption_key,
settings.s3_settings.access_key, settings.s3_settings.secret_access_key,
settings.s3_settings.bucket_name,
get_compress_encrypt_factory(),
settings.s3_settings.endpoint,
settings.s3_settings.region,
settings.s3_settings.storage_class,
static_cast<unsigned int>(settings.submit_compression), cachefs);
online_kv_store = new KvStoreFrontend(settings.s3_settings.cache_db_path,
s3_backend, !check_only, std::string(), std::string(), nullptr,
std::string(), false, false, cachefs);
}
else
{
return nullptr;
}
return new CloudFile(std::string(), cachefs,
settings.size, settings.size, online_kv_store, settings.encryption_key,
get_compress_encrypt_factory(), settings.verify_cache, settings.cpu_multiplier,
settings.background_compress, settings.no_compress_cpu_mult,
settings.reserved_cache_device_space, settings.min_metadata_cache_free,
settings.with_prev_link, settings.allow_evict, settings.with_submitted_files,
settings.resubmit_compressed_ratio, settings.memcache_size, std::string(),
settings.memory_usage_factor, settings.only_memfiles, std::string(),
std::string(), static_cast<unsigned int>(settings.background_compression),
std::string(), -1, false);
}

View File

@ -0,0 +1,13 @@
#pragma once
#include "IClouddriveFactory.h"
class ClouddriveFactory : public IClouddriveFactory
{
public:
virtual bool checkConnectivity(CloudSettings settings, int64 timeoutms) override;
virtual IFile* createCloudFile(IBackupFileSystem* cachefs, CloudSettings settings) override;
private:
IFile* createCloudFile(IBackupFileSystem* cachefs, CloudSettings settings, bool check_only);
};

View File

@ -0,0 +1,521 @@
/*************************************************************************
* UrBackup - Client/Server backup system
* Copyright (C) 2021 Martin Raiber
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
**************************************************************************/
#include "CompressEncrypt.h"
#include "../stringtools.h"
#include <stdexcept>
#include "CdZlibCompressor.h"
#include "LzmaCompressor.h"
#include "CdZstdCompressor.h"
#include "../urbackupcommon/os_functions.h"
#include "../urbackupcommon/events.h"
namespace
{
ICompressEncryptFactory* compress_encrypt_factory;
}
void init_compress_encrypt_factory()
{
compress_encrypt_factory = new CompressEncryptFactory;
}
unsigned int CompressionMethodFromString(const std::string & str)
{
if (str == "lzma_5")
return CompressionLzma5;
else if (str == "zlib_5")
return CompressionZlib5;
else if (str == "zstd_3")
return CompressionZstd3;
else if (str == "zstd_19")
return CompressionZstd19;
else if (str == "zstd_9")
return CompressionZstd9;
else
return CompressionZstd3;
}
ICompressEncryptFactory* get_compress_encrypt_factory()
{
return compress_encrypt_factory;
}
bool read_generation( IFile* file, int64 offset, int64& generation)
{
char header[sizeof(unsigned int)+12];
if(file->Read(offset, header, sizeof(header))!=sizeof(header))
{
Server->Log("Error reading file header for generation. "+os_last_error_str(), LL_ERROR);
return false;
}
unsigned int version;
memcpy(&version, header, sizeof(version));
version = little_endian(version);
unsigned int version_part = version & 0x0000FFFF;
if(version_part !=2)
{
Server->Log("Unknown object version: "+convert(version_part)+" while reading file header for generation", LL_ERROR);
return false;
}
generation = 0;
memcpy(&generation, header+sizeof(version)+6, 6);
generation = little_endian(generation);
return true;
}
ICompressAndEncrypt* CompressEncryptFactory::createCompressAndEncrypt(const std::string& encryption_key, IFile* file, IOnlineKvStore* online_kv_store, unsigned int compression_id)
{
ICompressor* compressor;
switch (compression_id)
{
case CompressionLzma5:
#ifdef WITH_LZMA
compressor = new LzmaCompressor;
#else
compressor = new CdZstdCompressor(17, CompressionZstd3);
#endif
break;
case CompressionZlib5:
compressor = new CdZlibCompressor(5, compression_id);
break;
case CompressionZstd3:
case CompressionZstd19:
case CompressionZstd9:
{
int level = 3;
switch (compression_id)
{
case CompressionZstd19:
level = 19;
case CompressionZstd9:
level = 9;
}
compressor = new CdZstdCompressor(level, CompressionZstd3);
} break;
default:
return NULL;
}
return new CompressAndEncrypt(encryption_key, file, online_kv_store, compressor);
}
IDecryptAndDecompress* CompressEncryptFactory::createDecryptAndDecompress(const std::string& encryption_key, IFile* output_file)
{
return new DecryptAndDecompress(encryption_key, output_file);
}
CompressAndEncrypt::CompressAndEncrypt( const std::string& encryption_key, IFile* file, IOnlineKvStore* online_kv_store, ICompressor* compressor)
: file(file), online_kv_store(online_kv_store), encryption(), encryption_filter(encryption),
compression_ended(false), ret_bytes(0), compressor(compressor), file_pos(0),
input_file_size(file->Size())
{
char iv[12];
CryptoPP::AutoSeededRandomPool prng;
prng.GenerateBlock(reinterpret_cast<CryptoPP::byte*>(iv), 6);
generation = online_kv_store->generation_inc(1);
uint64 ugen = static_cast<uint64>(generation);
if( ugen & 0xFFFF000000000000ULL)
{
Server->Log("Generation overflow. There is a small probability of nonce reuse.", LL_INFO);
ugen = ugen ^ ( (ugen >> 16) & 0xFFFF00000000 );
}
ugen = little_endian(ugen);
memcpy(&iv[6], &ugen, 6);
encryption.SetKeyWithIV(reinterpret_cast<const CryptoPP::byte*>(encryption_key.data()), encryption_key.size(),
reinterpret_cast<const CryptoPP::byte*>(iv), sizeof(iv));
unsigned int version=2;
version = version | (compressor->getId() << 16);
output_buffer.resize(sizeof(version) + sizeof(iv));
memcpy(&output_buffer[0], &version, sizeof(version));
memcpy(&output_buffer[sizeof(version)], iv, sizeof(iv));
output_buffer_pos = 0;
const size_t enc_buffer_size = 128 * 1024;
const size_t read_buffer_size = 128 * 1024;
read_buffer.resize(read_buffer_size);
compressed_buffer.resize(enc_buffer_size);
compressor->setOut(compressed_buffer.data(), compressed_buffer.size());
}
size_t CompressAndEncrypt::read( char* buffer, size_t buffer_size )
{
size_t ret_size = 0;
if(!output_buffer.empty())
{
size_t toread = (std::min)(buffer_size, output_buffer.size()-output_buffer_pos);
memcpy(buffer, &output_buffer[output_buffer_pos], toread);
md5.update(reinterpret_cast<unsigned char*>(buffer), static_cast<unsigned int>(toread));
output_buffer_pos+=toread;
buffer+=toread;
buffer_size-=toread;
ret_size+=toread;
if(output_buffer_pos==output_buffer.size())
{
output_buffer.clear();
}
if (buffer_size == 0)
{
return ret_size;
}
}
if(compression_ended)
{
size_t ret_add = encryption_filter.Get(reinterpret_cast<CryptoPP::byte*>(buffer), buffer_size);
if(ret_add>0)
{
md5.update(reinterpret_cast<unsigned char*>(buffer), static_cast<unsigned int>(ret_add));
}
ret_bytes+=ret_size + ret_add;
return ret_size + ret_add;
}
try
{
do
{
if(compressor->getAvailIn()==0 && file!=NULL)
{
bool has_read_error = false;
_u32 read = file->Read(file_pos, read_buffer.data(),
static_cast<_u32>(buffer_size), &has_read_error);
if (has_read_error)
{
std::string msg = "Read error while reading from file "
+ file->getFilename() + " at position " + convert(file_pos)
+ " len " + convert(buffer_size) + " for compression and encryption. " + os_last_error_str();
Server->Log(msg, LL_ERROR);
addSystemEvent("cache_err",
"Error reading from file on cache",
msg, LL_ERROR);
return std::string::npos;
}
if(read>0)
{
compressor->setIn(read_buffer.data(), read);
file_pos += read;
}
else
{
if (file_pos < input_file_size)
{
std::string msg = "Read only " + convert(file_pos) + " of total " +
convert(input_file_size) + " from " + file->getFilename();
Server->Log(msg, LL_ERROR);
addSystemEvent("cache_err",
"Error reading from file on cache",
msg, LL_ERROR);
return std::string::npos;
}
file=NULL;
}
}
int code;
CompressResult ret = compressor->compress(file == NULL, code);
if(compressor->getAvailOut()==0 || ret == CompressResult_End)
{
size_t write_size = compressed_buffer.size() - compressor->getAvailOut();
if(write_size>0)
{
encryption_filter.Put(reinterpret_cast<const CryptoPP::byte*>(compressed_buffer.data()), write_size);
size_t ret_add = encryption_filter.Get(reinterpret_cast<CryptoPP::byte*>(buffer), buffer_size);
md5.update(reinterpret_cast<unsigned char*>(buffer), static_cast<unsigned int>(ret_add));
buffer+=ret_add;
buffer_size-=ret_add;
ret_size+=ret_add;
}
compressor->setOut(compressed_buffer.data(), compressed_buffer.size());
}
if(ret != CompressResult_Ok)
{
if(ret == CompressResult_End)
{
compression_ended=true;
encryption_filter.MessageEnd();
if(buffer_size>0)
{
size_t add_size = read(buffer, buffer_size);
if(add_size==std::string::npos)
{
return add_size;
}
else
{
ret_size+=add_size;
return ret_size;
}
}
}
else
{
Server->Log("Error while compressing (code: "+convert(code)+")", LL_ERROR);
return std::string::npos;
}
}
}
while(ret_size==0);
}
catch(CryptoPP::Exception& e)
{
Server->Log(std::string("Exception during encryption: ")+e.what(), LL_ERROR);
return std::string::npos;
}
ret_bytes+=ret_size;
return ret_size;
}
int64 CompressAndEncrypt::get_generation()
{
return generation;
}
std::string CompressAndEncrypt::md5sum()
{
md5.finalize();
return std::string(reinterpret_cast<char*>(md5.raw_digest_int()), 16);
}
DecryptAndDecompress::DecryptAndDecompress( const std::string& encryption_key, IFile* output_file ) : read_state(EReadState_Version), header_buf_pos(0), decryption(), decryption_filter(decryption),
encryption_key(encryption_key), output_file(output_file), file_pos(0)
{
const size_t buffer_size = 128*1024;
output_buf.resize(buffer_size);
decrypted_buffer.resize(buffer_size);
}
bool DecryptAndDecompress::put( char* buffer, size_t buffer_size )
{
if(read_state==EReadState_Version)
{
size_t toread = (std::min)(sizeof(version)-header_buf_pos, buffer_size);
memcpy(header_buf, buffer, toread);
md5.update(reinterpret_cast<unsigned char*>(buffer), static_cast<unsigned int>(toread));
header_buf_pos+=toread;
if(header_buf_pos==sizeof(version))
{
header_buf_pos=0;
read_state = EReadState_Iv;
memcpy(&version, header_buf, sizeof(version));
unsigned int version_part = version & 0x0000FFFF;
if(version_part ==1)
{
iv_size = iv_size_v1;
}
else if(version_part ==2)
{
iv_size = iv_size_v2;
}
else
{
Server->Log("Unknown block version: "+convert(version_part), LL_ERROR);
return false;
}
if (!init_decompression((version & 0xFFFF0000) >> 16))
{
Server->Log("Error during decompression init. Decompressor id " + convert((version & 0xFFFF0000) >> 16), LL_ERROR);
return false;
}
if(buffer_size>toread)
{
return put(buffer+toread, buffer_size-toread);
}
}
}
else if(read_state == EReadState_Iv)
{
size_t toread = (std::min)(iv_size-header_buf_pos, buffer_size);
memcpy(header_buf, buffer, toread);
md5.update(reinterpret_cast<unsigned char*>(buffer), static_cast<unsigned int>(toread));
header_buf_pos+=toread;
if(header_buf_pos==iv_size)
{
read_state = EReadState_Data;
decryption.SetKeyWithIV(reinterpret_cast<const CryptoPP::byte*>(encryption_key.data()), encryption_key.size(),
reinterpret_cast<const CryptoPP::byte*>(header_buf), iv_size);
if(buffer_size>toread)
{
return put(buffer+toread, buffer_size-toread);
}
}
return true;
}
md5.update(reinterpret_cast<unsigned char*>(buffer), static_cast<unsigned int>(buffer_size));
try
{
decryption_filter.Put(reinterpret_cast<CryptoPP::byte*>(buffer), buffer_size);
return decrypt();
}
catch(CryptoPP::Exception& e)
{
Server->Log(std::string("Exception during decryption: ")+e.what(), LL_ERROR);
return false;
}
}
bool DecryptAndDecompress::finalize()
{
try
{
decryption_filter.MessageEnd();
while(decryption_filter.AnyRetrievable())
{
if(!decrypt())
{
return false;
}
}
return true;
}
catch(CryptoPP::Exception& e)
{
Server->Log(std::string("Exception during decryption (finalize): ")+e.what(), LL_ERROR);
return false;
}
}
std::string DecryptAndDecompress::md5sum()
{
md5.finalize();
return md5.hex_digest();
}
bool DecryptAndDecompress::decrypt()
{
size_t decrypted_size = decryption_filter.Get(reinterpret_cast<CryptoPP::byte*>(decrypted_buffer.data()), decrypted_buffer.size());
decompressor->setIn(decrypted_buffer.data(), decrypted_size);
while (decompressor->getAvailIn()>0)
{
int code;
DecompressResult ret = decompressor->decompress(code);
if(decompressor->getAvailOut()==0 || ret==DecompressResult_End )
{
size_t write_size = output_buf.size() - decompressor->getAvailOut();
if(write_size>0)
{
if(output_file->Write(file_pos, output_buf.data(), static_cast<_u32>(write_size) )!=write_size)
{
Server->Log("Error writing data to output file. "+os_last_error_str(), LL_ERROR);
return false;
}
file_pos += write_size;
}
decompressor->setOut(output_buf.data(), output_buf.size());
}
if(ret!= DecompressResult_Ok)
{
if (ret == DecompressResult_End)
{
return true;
}
else
{
Server->Log("Error while decompressing (code: "+convert(code)+")", LL_ERROR);
return false;
}
}
}
return true;
}
bool DecryptAndDecompress::init_decompression(unsigned int decompressor_id)
{
switch (decompressor_id)
{
#ifdef WITH_LZMA
case CompressionLzma5:
decompressor.reset(new LzmaDecompressor);
break;
#endif
case CompressionZlib5:
decompressor.reset(new CdZlibDecompressor);
break;
case CompressionZstd3:
decompressor.reset(new CdZstdDecompressor);
break;
default:
return false;
}
decompressor->setOut(output_buf.data(), output_buf.size());
return true;
}

View File

@ -0,0 +1,142 @@
#pragma once
#include "ICompressEncrypt.h"
#include "../cryptoplugin/cryptopp_inc.h"
#include "../md5.h"
class CompressEncryptFactory : public ICompressEncryptFactory
{
public:
ICompressAndEncrypt* createCompressAndEncrypt(const std::string& encryption_key, IFile* file, IOnlineKvStore* online_kv_store, unsigned int compression_id);
IDecryptAndDecompress* createDecryptAndDecompress(const std::string& encryption_key, IFile* output_file);
};
void init_compress_encrypt_factory();
namespace
{
enum CompressResult
{
CompressResult_Ok,
CompressResult_End,
CompressResult_Other
};
enum DecompressResult
{
DecompressResult_Ok,
DecompressResult_End,
DecompressResult_Other
};
}
class ICompressor : public IObject
{
public:
virtual void setOut(char* next_out, size_t avail_out) = 0 ;
virtual void setIn(char* next_in, size_t avail_in) = 0;
virtual size_t getAvailOut() = 0;
virtual size_t getAvailIn() = 0;
virtual CompressResult compress(bool finish, int& code) = 0;
virtual unsigned int getId() = 0;
};
class IDecompressor : public IObject
{
public:
virtual void setOut(char* next_out, size_t avail_out) = 0;
virtual void setIn(char* next_in, size_t avail_in) = 0;
virtual size_t getAvailOut() = 0;
virtual size_t getAvailIn() = 0;
virtual DecompressResult decompress(int& code) = 0;
};
class CompressAndEncrypt : public ICompressAndEncrypt
{
public:
CompressAndEncrypt(const std::string& encryption_key, IFile* file, IOnlineKvStore* online_kv_store, ICompressor* compressor);
size_t read(char* buffer, size_t buffer_size);
int64 get_generation();
std::string md5sum();
size_t readBytes()
{
return ret_bytes;
}
private:
std::vector<char> output_buffer;
size_t output_buffer_pos;
IFile* file;
int64 file_pos;
IOnlineKvStore* online_kv_store;
CryptoPP::GCM< CryptoPP::AES >::Encryption encryption;
CryptoPP::AuthenticatedEncryptionFilter encryption_filter;
std::unique_ptr<ICompressor> compressor;
std::vector<char> read_buffer;
std::vector<char> compressed_buffer;
bool compression_ended;
int64 generation;
size_t ret_bytes;
int64 input_file_size;
MD5 md5;
};
const size_t iv_size_v1 = 24;
const size_t iv_size_v2 = 12;
enum EReadState
{
EReadState_Version,
EReadState_Iv,
EReadState_Data
};
class DecryptAndDecompress : public IDecryptAndDecompress
{
public:
DecryptAndDecompress(const std::string& encryption_key, IFile* output_file);
bool put(char* buffer, size_t buffer_size);
bool finalize();
std::string md5sum();
private:
bool decrypt();
bool init_decompression(unsigned int decompressor_id);
unsigned int version;
size_t iv_size;
EReadState read_state;
char header_buf[30];
size_t header_buf_pos;
std::unique_ptr<IDecompressor> decompressor;
std::vector<char> decrypted_buffer;
std::vector<char> output_buf;
CryptoPP::GCM< CryptoPP::AES >::Decryption decryption;
CryptoPP::AuthenticatedDecryptionFilter decryption_filter;
std::string encryption_key;
IFile* output_file;
int64 file_pos;
MD5 md5;
};
bool read_generation( IFile* file, int64 offset, int64& generation);

View File

@ -0,0 +1,65 @@
#pragma once
#include "../Interface/Plugin.h"
#include "../Interface/File.h"
#include "../Interface/BackupFileSystem.h"
class IClouddriveFactory : public IPlugin
{
public:
enum class CloudEndpoint
{
S3
};
enum class CompressionMethod
{
lzma_5 = 0,
zlib_5 = 1,
zstd_3 = 2,
zstd_19 = 3,
zstd_9 = 4
};
struct CloudSettingsS3
{
std::string access_key;
std::string secret_access_key;
std::string bucket_name;
std::string endpoint;
std::string region;
std::string storage_class;
std::string cache_db_path;
};
struct CloudSettings
{
int64 size = -1;
int64 memcache_size = 0;
bool background_compress = false;
bool with_prev_link = true;
bool allow_evict = true;
std::string encryption_key;
float resubmit_compressed_ratio = 0.8f;
bool verify_cache = false;
CompressionMethod submit_compression = CompressionMethod::zstd_9;
CompressionMethod background_compression = CompressionMethod::lzma_5;
bool multi_trans_delete = true;
int64 reserved_cache_device_space = 100 * 1024 * 1024;
float memory_usage_factor = 0.1f;
float cpu_multiplier = 1.0f;
size_t no_compress_cpu_mult = 1;
int64 min_metadata_cache_free = 100 * 1024 * 1024;
bool with_submitted_files = true;
bool only_memfiles = false;
CloudEndpoint endpoint;
CloudSettingsS3 s3_settings;
};
virtual bool checkConnectivity(CloudSettings settings,
int64 timeoutms) = 0;
virtual IFile* createCloudFile(IBackupFileSystem* cachefs,
CloudSettings settings) = 0;
};

View File

@ -0,0 +1,46 @@
#pragma once
#include "../Interface/File.h"
#include "../Interface/Object.h"
#include <string>
#include "IOnlineKvStore.h"
class ICompressAndEncrypt : public IObject
{
public:
virtual size_t read(char* buffer, size_t buffer_size) = 0;
virtual int64 get_generation() = 0;
virtual std::string md5sum() = 0;
virtual size_t readBytes() = 0;
};
class IDecryptAndDecompress : public IObject
{
public:
virtual bool put(char* buffer, size_t buffer_size) = 0;
virtual bool finalize() = 0;
virtual std::string md5sum() = 0;
};
class IOnlineKvStore;
namespace
{
const unsigned int CompressionLzma5 = 0;
const unsigned int CompressionZlib5 = 1;
const unsigned int CompressionZstd3 = 2;
const unsigned int CompressionZstd19 = 3;
const unsigned int CompressionZstd9 = 4;
}
unsigned int CompressionMethodFromString(const std::string& str);
class ICompressEncryptFactory : public IObject
{
public:
virtual ICompressAndEncrypt* createCompressAndEncrypt(const std::string& encryption_key, IFile* file, IOnlineKvStore* online_kv_store, unsigned int compression_id) = 0;
virtual IDecryptAndDecompress* createDecryptAndDecompress(const std::string& encryption_key, IFile* output_file) = 0;
};
ICompressEncryptFactory* get_compress_encrypt_factory();

View File

@ -0,0 +1,92 @@
#pragma once
#include <string>
#include <functional>
#include "../Interface/Types.h"
#include "../Interface/File.h"
#include "../Interface/Object.h"
class IOnlineKvStore;
class IKvStoreBackend : public IObject
{
public:
class IListCallback
{
public:
virtual bool onlineItem(const std::string& key, const std::string& md5sum, int64 size, int64 last_modified)=0;
};
const static unsigned int GetDecrypted = 1;
const static unsigned int GetRebalance = 2;
const static unsigned int GetScrub = 4;
const static unsigned int GetPrioritize = 8;
const static unsigned int GetReadahead = 16;
const static unsigned int GetUnsynced = 32;
const static unsigned int GetRebuild = 64;
const static unsigned int GetIgnoreReadErrors = 128;
const static unsigned int GetPrependMd5sum = 256;
const static unsigned int GetBackground = 512;
const static unsigned int GetNoThrottle = 1024;
const static unsigned int GetMetadata = 2048;
const static unsigned int GetStatusRepaired = 1;
const static unsigned int GetStatusEnospc = 2;
const static unsigned int GetStatusNotFound = 4;
const static unsigned int GetStatusRepairError = 8;
virtual bool get( const std::string& key, const std::string& path, const std::string& md5sum,
unsigned int flags, bool allow_error_event, IFsFile*& ret_file, std::string& ret_md5sum, unsigned int& get_status) = 0;
virtual bool list(IListCallback* callback) = 0;
const static unsigned int PutAlreadyCompressedEncrypted = 1;
const static unsigned int PutMetadata = 2;
virtual bool put( const std::string& key, IFsFile* src, const std::string& path,
unsigned int flags, bool allow_error_event, std::string& md5sum, int64& size) = 0;
enum class key_next_action_t
{
next,
reset,
clear
};
using key_next_fun_t = std::function<bool(key_next_action_t action, std::string* key)>;
virtual bool del(key_next_fun_t key_next_fun,
bool background_queue) = 0;
using locinfo_next_fun_t = std::function<bool(key_next_action_t action, std::string* locinfo)>;
virtual bool del(key_next_fun_t key_next_fun,
locinfo_next_fun_t locinfo_next_fun,
bool background_queue) = 0;
virtual bool need_curr_del() = 0;
virtual size_t max_del_size() = 0;
virtual size_t num_del_parallel() = 0;
virtual size_t num_scrub_parallel() = 0;
virtual bool sync(bool sync_test, bool background_queue) = 0;
virtual bool is_put_sync() = 0;
virtual bool has_transactions() = 0;
virtual bool prefer_sequential_read() = 0;
virtual bool del_with_location_info() = 0;
virtual bool ordered_del() = 0;
virtual bool can_read_unsynced() = 0;
virtual void setFrontend(IOnlineKvStore* online_kv_store, bool do_init) = 0;
virtual std::string meminfo() = 0;
virtual bool check_deleted(const std::string& key, const std::string& locinfo) = 0;
virtual int64 get_uploaded_bytes()=0;
virtual int64 get_downloaded_bytes()=0;
virtual bool want_put_metadata() = 0;
virtual bool fast_write_retry() = 0;
};

View File

@ -0,0 +1,68 @@
#pragma once
#include "../Interface/File.h"
#include "../Interface/Object.h"
#include "../Interface/SharedMutex.h"
class IOnlineKvStore : public IObject
{
public:
virtual IFsFile* get(const std::string& key, int64 transid, const std::string& path,
bool prioritize_read, IFsFile* tmpl_file, bool allow_error_event,
bool& not_found, int64* get_transid=nullptr) = 0;
//0 if not implemented -- allowed to have 0 as false negative
virtual int64 get_transid(const std::string& key, int64 transid) = 0;
virtual bool reset(const std::string& key, int64 transid) = 0;
const static unsigned int PutAlreadyCompressedEncrypted = 1;
const static unsigned int PutMetadata = 2;
virtual bool put(const std::string& key, int64 transid, IFsFile* src,
const std::string& path, unsigned int flags,
bool allow_error_event, int64& compressed_size) = 0;
virtual int64 new_transaction(bool allow_error_event) = 0;
virtual bool transaction_finalize(int64 transid, bool complete, bool allow_error_event) = 0;
virtual bool set_active_transactions(const std::vector<int64>& active_transactions) = 0;
virtual bool del(const std::vector<std::string>& keys, int64 transid) = 0;
virtual size_t max_del_size() = 0;
virtual int64 generation_inc(int64 inc) = 0;
virtual std::string get_stats() = 0;
virtual bool sync() = 0;
virtual bool sync_db() = 0;
virtual bool sync_lock(IScopedWriteLock& lock) = 0;
virtual bool is_put_sync() = 0;
virtual std::string meminfo() = 0;
virtual bool has_backend_key(const std::string& key, std::string& md5sum, bool update_md5sum) = 0;
virtual int64 get_uploaded_bytes() = 0;
virtual int64 get_downloaded_bytes() = 0;
virtual bool want_put_metadata() = 0;
virtual bool fast_write_retry() = 0;
class IHasKeyCallback
{
public:
virtual bool hasKey(const std::string& key) = 0;
};
virtual bool submit_del(IHasKeyCallback* has_key_callback, int64 ctransid, bool& need_flush) = 0;
virtual void submit_del_post_flush() = 0;
};

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,133 @@
#pragma once
#include <memory>
#include "IKvStoreBackend.h"
#include <aws/core/auth/AWSCredentialsProvider.h>
#include <aws/s3/S3Client.h>
#include <aws/s3/model/StorageClass.h>
#include "ICompressEncrypt.h"
#include "../Interface/Mutex.h"
#include <stack>
#include "../common/relaxed_atomic.h"
class IOnlineKvStore;
class IBackupFileSystem;
class KvStoreBackendS3 : public IKvStoreBackend
{
public:
KvStoreBackendS3(const std::string& encryption_key, const std::string& access_key, const std::string& secret_access_key,
const std::string& bucket_name, ICompressEncryptFactory* compress_encrypt_factory, const std::string& s3_endpoint,
const std::string& s3_region, const std::string& p_storage_class, unsigned int comp_method,
IBackupFileSystem* cachefs);
static void init_mutex();
virtual bool get( const std::string& key, const std::string& path, const std::string& md5sum,
unsigned int flags, bool allow_error_event, IFsFile*& ret_file, std::string& ret_md5sum, unsigned int& get_status);
virtual bool list( IListCallback* callback );
virtual bool put( const std::string& key, IFsFile* src, const std::string& path,
unsigned int flags, bool allow_error_event, std::string& md5sum, int64& compressed_size);
virtual bool del( key_next_fun_t key_next_fun,
bool background_queue);
virtual bool del( key_next_fun_t key_next_fun,
locinfo_next_fun_t locinfo_next_fun,
bool background_queue)
{
return del(key_next_fun, background_queue);
}
virtual size_t max_del_size() { return 100; }
virtual size_t num_del_parallel() { return 1; }
virtual size_t num_scrub_parallel() { return 1; };
virtual void setFrontend(IOnlineKvStore* online_kv_store, bool do_init);
virtual bool sync(bool sync_test, bool background_queue) { return true; }
virtual bool is_put_sync() { return true; }
virtual bool has_transactions() { return false; }
virtual bool prefer_sequential_read() { return false; }
virtual bool del_with_location_info() { return false; }
virtual bool ordered_del() { return false; }
virtual bool can_read_unsynced() {
return true;
}
virtual std::string meminfo();
virtual bool check_deleted(const std::string& key, const std::string& locinfo)
{
//not implemented
return false;
}
virtual bool need_curr_del(){ return false; }
virtual int64 get_uploaded_bytes() {
return uploaded_bytes;
}
virtual int64 get_downloaded_bytes() {
return downloaded_bytes;
}
void add_uploaded_bytes(int64 n) {
uploaded_bytes+=n;
}
virtual bool want_put_metadata() { return false; }
virtual bool fast_write_retry() { return false; }
private:
std::string encryption_key;
std::pair<int64, std::shared_ptr<Aws::S3::S3Client> > getS3Client(size_t idx, bool useVirtualAdressing=true);
std::pair<int64, std::shared_ptr<Aws::S3::S3Client> > newS3Client(size_t idx, int64 curr_requesttimeout, bool useVirtualAdressing);
void releaseS3Client(size_t idx, std::pair<int64, std::shared_ptr<Aws::S3::S3Client> > client);
void resetClient();
virtual bool del_int( key_next_fun_t key_next_fun,
bool shard_optimized);
void fixError(Aws::S3::S3Errors error);
struct SBucket
{
std::string name;
Aws::S3::Model::BucketLocationConstraint location;
};
std::vector<SBucket> buckets;
std::string s3_endpoint;
std::string s3_region;
Aws::S3::Model::StorageClass storage_class;
static int64 max_request_timems;
static int64 n_requests;
static IMutex* client_mutex;
std::vector<std::stack<std::pair<int64, std::shared_ptr<Aws::S3::S3Client> > > > s3_clients;
std::shared_ptr<Aws::Auth::AWSCredentialsProvider> credentials_provider;
ICompressEncryptFactory* compress_encrypt_factory;
IOnlineKvStore* online_kv_store;
unsigned int comp_method;
relaxed_atomic<int64> uploaded_bytes;
relaxed_atomic<int64> downloaded_bytes;
IBackupFileSystem* cachefs;
};

2726
clouddrive/KvStoreDao.cpp Normal file

File diff suppressed because it is too large Load Diff

324
clouddrive/KvStoreDao.h Normal file
View File

@ -0,0 +1,324 @@
#pragma once
#include "../Interface/Database.h"
class KvStoreDao
{
public:
KvStoreDao(IDatabase *db);
~KvStoreDao();
IDatabase* getDb();
void createTables();
//@-SQLGenFunctionsBegin
struct CdDelObject
{
int64 trans_id;
std::string tkey;
};
struct CdDelObjectMd5
{
int64 trans_id;
std::string tkey;
std::string md5sum;
};
struct CdIterObject
{
int64 trans_id;
std::string tkey;
std::string md5sum;
int64 size;
int64 last_modified;
};
struct CdIterObject2
{
int64 id;
int64 trans_id;
std::string tkey;
std::string md5sum;
int64 size;
};
struct CdObject
{
bool exists;
int64 trans_id;
int64 size;
std::string md5sum;
};
struct CdSingleObject
{
bool exists;
std::string tkey;
int64 trans_id;
int64 size;
std::string md5sum;
};
struct CondInt64
{
bool exists;
int64 value;
};
struct CondString
{
bool exists;
std::string value;
};
struct SCdTrans
{
int64 id;
int completed;
int active;
};
struct SDelItemMd5
{
std::string tkey;
std::string md5sum;
};
struct SSize
{
bool exists;
int64 size;
int64 count;
};
struct STransactionProperties
{
bool exists;
int active;
int completed;
int64 cd_id;
};
struct SUnmirrored
{
bool exists;
int64 tsize;
int64 tcount;
};
struct Task
{
bool exists;
int64 id;
int task_id;
int64 trans_id;
int64 cd_id;
};
void createTransactionTable(void);
void createTransactionTableCd(void);
void createObjectTable(void);
void createObjectTableCd(void);
void createObjectTransIdIdx(void);
void createObjectCdTransIdIdx(void);
bool createObjectLastModifiedIdx(void);
bool createObjectCdLastModifiedIdx(void);
void dropObjectLastModifiedIdx(void);
void dropObjectCdLastModifiedIdx(void);
void createGenerationTable(void);
void createGenerationTableCd(void);
void createTaskTable(void);
void createMiscTable(void);
void setTaskActive(int64 id);
Task getActiveTask(void);
std::vector<Task> getTasks(int64 created_max, int task_id, int64 cd_id);
Task getTask(int64 created_max);
void removeTask(int64 id);
void addTask(int task_id, int64 trans_id, int64 created, int64 cd_id);
std::vector<SCdTrans> getTransactionIds(void);
std::vector<SCdTrans> getTransactionIdsCd(int64 cd_id);
SSize getSize(void);
int64 getSizePartial(const std::string& tkey, int64 tans_id);
int64 getSizePartialLMInit(int64 last_modified_start);
int64 getSizePartialLM(int64 last_modified_start, int64 last_modified_stop);
void setTransactionActive(int active, int64 id);
void setTransactionActiveCd(int active, int64 cd_id, int64 id);
CondInt64 getMaxCompleteTransaction(void);
CondInt64 getMaxCompleteTransactionCd(int64 cd_id);
std::vector<int64> getIncompleteTransactions(int64 max_active);
std::vector<int64> getIncompleteTransactionsCd(int64 max_active, int64 cd_id);
bool deleteTransaction(int64 id);
bool deleteTransactionCd(int64 cd_id, int64 id);
std::vector<SDelItemMd5> getTransactionObjectsMd5(int64 trans_id);
std::vector<SDelItemMd5> getTransactionObjectsMd5Cd(int64 cd_id, int64 trans_id);
std::vector<std::string> getTransactionObjects(int64 trans_id);
std::vector<std::string> getTransactionObjectsCd(int64 cd_id, int64 trans_id);
bool deleteTransactionObjects(int64 trans_id);
bool deleteTransactionObjectsCd(int64 cd_id, int64 trans_id);
void newTransaction(void);
void newTransactionCd(int64 cd_id);
void insertTransaction(int64 id);
void insertTransactionCd(int64 id, int64 cd_id);
void setTransactionComplete(int completed, int64 trans_id);
void setTransactionCompleteCd(int completed, int64 cd_id, int64 trans_id);
std::vector<int64> getDeletableTransactions(int64 curr_trans_id);
std::vector<int64> getDeletableTransactionsCd(int64 cd_id, int64 curr_trans_id);
std::vector<int64> getLastFinalizedTransactions(int64 last_trans_id, int64 curr_complete_trans_id);
std::vector<int64> getLastFinalizedTransactionsCd(int64 cd_id, int64 last_trans_id, int64 curr_complete_trans_id);
std::vector<CdDelObjectMd5> getDeletableObjectsMd5Ordered(int64 curr_trans_id);
std::vector<CdDelObjectMd5> getDeletableObjectsMd5Cd(int64 cd_id, int64 curr_trans_id);
std::vector<CdDelObjectMd5> getDeletableObjectsMd5(int64 curr_trans_id);
std::vector<CdDelObject> getDeletableObjectsOrdered(int64 curr_trans_id);
std::vector<CdDelObject> getDeletableObjects(int64 curr_trans_id);
bool deleteDeletableObjects(int64 curr_trans_id);
bool deleteDeletableObjectsCd(int64 cd_id, int64 curr_trans_id);
void addDelMarkerObject(int64 trans_id, const std::string& tkey);
void addDelMarkerObjectCd(int64 cd_id, int64 trans_id, const std::string& tkey);
bool addObject(int64 trans_id, const std::string& tkey, const std::string& md5sum, int64 size);
bool addObjectCd(int64 cd_id, int64 trans_id, const std::string& tkey, const std::string& md5sum, int64 size);
bool addObject2(int64 trans_id, const std::string& tkey, const std::string& md5sum, int64 size, int64 last_modified);
bool addObject2Cd(int64 cd_id, int64 trans_id, const std::string& tkey, const std::string& md5sum, int64 size, int64 last_modified);
bool addPartialObject(int64 trans_id, const std::string& tkey);
bool addPartialObjectCd(int64 cd_id, int64 trans_id, const std::string& tkey);
bool updateObjectSearch(const std::string& md5sum, int64 size, const std::string& tkey, int64 trans_id);
bool updateObject(const std::string& md5sum, int64 size, int64 rowid);
bool updateObjectCd(const std::string& md5sum, int64 size, int64 rowid);
bool updateObject2Cd(const std::string& md5sum, int64 size, int64 last_modified, int64 rowid);
bool updateObject2(const std::string& md5sum, int64 size, int64 last_modified, int64 rowid);
void deletePartialObject(int64 rowid);
void deletePartialObjectCd(int64 rowid);
void updateGeneration(int64 generation);
CondInt64 getGeneration(void);
CondInt64 getGenerationCd(int64 cd_id);
void insertGeneration(int64 generation);
CdObject getObjectInTransid(int64 trans_id, const std::string& tkey);
CdObject getObjectInTransidCd(int64 cd_id, int64 trans_id, const std::string& tkey);
CdSingleObject getSingleObject(void);
CdObject getObject(int64 curr_trans_id, const std::string& tkey);
CdObject getObjectCd(int64 cd_id, int64 curr_trans_id, const std::string& tkey);
CondInt64 isTransactionActive(int64 trans_id);
CondInt64 isTransactionActiveCd(int64 cd_id, int64 trans_id);
bool deleteObject(int64 trans_id, const std::string& tkey);
CondString getMiscValue(const std::string& key);
void setMiscValue(const std::string& key, const std::string& value);
STransactionProperties getTransactionProperties(int64 id);
STransactionProperties getTransactionPropertiesCd(int64 cd_id, int64 id);
std::vector<CdIterObject> getInitialObjectsLM(void);
std::vector<CdIterObject> getInitialObjects(void);
std::vector<CdIterObject> getIterObjectsLMInit(int64 last_modified_start);
std::vector<CdIterObject> getIterObjectsLM(int64 last_modified_start, int64 last_modified_stop);
std::vector<CdIterObject> getIterObjects(const std::string& tkey, int64 tans_id);
void updateObjectMd5sum(const std::string& md5sum, int64 transid, const std::string& tkey);
void updateObjectMd5sumCd(const std::string& md5sum, int64 cd_id, int64 transid, const std::string& tkey);
void insertAllDeletionTasks(void);
std::vector<CdIterObject2> getUnmirroredObjects(void);
SUnmirrored getUnmirroredObjectsSize(void);
void setObjectMirrored(int64 rowid);
std::vector<SCdTrans> getUnmirroredTransactions(void);
void setTransactionMirrored(int64 id);
bool updateGenerationCd(int64 cd_id, int64 generation);
//@-SQLGenFunctionsEnd
IQuery* getUpdateGenerationQuery();
void setUpdateGenerationQuery(IQuery* q);
private:
//@-SQLGenVariablesBegin
IQuery* q_createTransactionTable;
IQuery* q_createTransactionTableCd;
IQuery* q_createObjectTable;
IQuery* q_createObjectTableCd;
IQuery* q_createObjectTransIdIdx;
IQuery* q_createObjectCdTransIdIdx;
IQuery* q_createObjectLastModifiedIdx;
IQuery* q_createObjectCdLastModifiedIdx;
IQuery* q_dropObjectLastModifiedIdx;
IQuery* q_dropObjectCdLastModifiedIdx;
IQuery* q_createGenerationTable;
IQuery* q_createGenerationTableCd;
IQuery* q_createTaskTable;
IQuery* q_createMiscTable;
IQuery* q_setTaskActive;
IQuery* q_getActiveTask;
IQuery* q_getTasks;
IQuery* q_getTask;
IQuery* q_removeTask;
IQuery* q_addTask;
IQuery* q_getTransactionIds;
IQuery* q_getTransactionIdsCd;
IQuery* q_getSize;
IQuery* q_getSizePartial;
IQuery* q_getSizePartialLMInit;
IQuery* q_getSizePartialLM;
IQuery* q_setTransactionActive;
IQuery* q_setTransactionActiveCd;
IQuery* q_getMaxCompleteTransaction;
IQuery* q_getMaxCompleteTransactionCd;
IQuery* q_getIncompleteTransactions;
IQuery* q_getIncompleteTransactionsCd;
IQuery* q_deleteTransaction;
IQuery* q_deleteTransactionCd;
IQuery* q_getTransactionObjectsMd5;
IQuery* q_getTransactionObjectsMd5Cd;
IQuery* q_getTransactionObjects;
IQuery* q_getTransactionObjectsCd;
IQuery* q_deleteTransactionObjects;
IQuery* q_deleteTransactionObjectsCd;
IQuery* q_newTransaction;
IQuery* q_newTransactionCd;
IQuery* q_insertTransaction;
IQuery* q_insertTransactionCd;
IQuery* q_setTransactionComplete;
IQuery* q_setTransactionCompleteCd;
IQuery* q_getDeletableTransactions;
IQuery* q_getDeletableTransactionsCd;
IQuery* q_getLastFinalizedTransactions;
IQuery* q_getLastFinalizedTransactionsCd;
IQuery* q_getDeletableObjectsMd5Ordered;
IQuery* q_getDeletableObjectsMd5Cd;
IQuery* q_getDeletableObjectsMd5;
IQuery* q_getDeletableObjectsOrdered;
IQuery* q_getDeletableObjects;
IQuery* q_deleteDeletableObjects;
IQuery* q_deleteDeletableObjectsCd;
IQuery* q_addDelMarkerObject;
IQuery* q_addDelMarkerObjectCd;
IQuery* q_addObject;
IQuery* q_addObjectCd;
IQuery* q_addObject2;
IQuery* q_addObject2Cd;
IQuery* q_addPartialObject;
IQuery* q_addPartialObjectCd;
IQuery* q_updateObjectSearch;
IQuery* q_updateObject;
IQuery* q_updateObjectCd;
IQuery* q_updateObject2Cd;
IQuery* q_updateObject2;
IQuery* q_deletePartialObject;
IQuery* q_deletePartialObjectCd;
IQuery* q_updateGeneration;
IQuery* q_getGeneration;
IQuery* q_getGenerationCd;
IQuery* q_insertGeneration;
IQuery* q_getObjectInTransid;
IQuery* q_getObjectInTransidCd;
IQuery* q_getSingleObject;
IQuery* q_getObject;
IQuery* q_getObjectCd;
IQuery* q_isTransactionActive;
IQuery* q_isTransactionActiveCd;
IQuery* q_deleteObject;
IQuery* q_getMiscValue;
IQuery* q_setMiscValue;
IQuery* q_getTransactionProperties;
IQuery* q_getTransactionPropertiesCd;
IQuery* q_getInitialObjectsLM;
IQuery* q_getInitialObjects;
IQuery* q_getIterObjectsLMInit;
IQuery* q_getIterObjectsLM;
IQuery* q_getIterObjects;
IQuery* q_updateObjectMd5sum;
IQuery* q_updateObjectMd5sumCd;
IQuery* q_insertAllDeletionTasks;
IQuery* q_getUnmirroredObjects;
IQuery* q_getUnmirroredObjectsSize;
IQuery* q_setObjectMirrored;
IQuery* q_getUnmirroredTransactions;
IQuery* q_setTransactionMirrored;
IQuery* q_updateGenerationCd;
//@-SQLGenVariablesEnd
void prepareQueries();
IDatabase* db;
};

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,716 @@
#pragma once
#include "IOnlineKvStore.h"
#include "IKvStoreBackend.h"
#include "KvStoreDao.h"
#include "../Interface/Thread.h"
#include "../Interface/Mutex.h"
#include "../Interface/Condition.h"
#include "../Interface/SharedMutex.h"
#include "../stringtools.h"
#include <memory>
#include <set>
#include <queue>
#include "../common/relaxed_atomic.h"
enum class ScrubAction
{
Balance,
Rebuild,
Scrub
};
namespace
{
std::string strScrubAction(ScrubAction a) {
switch (a) {
case ScrubAction::Balance: return "balance";
case ScrubAction::Rebuild: return "rebuild";
case ScrubAction::Scrub: return "scrub";
default: return "undef";
}
}
std::string strScrubActionC(ScrubAction a)
{
std::string ret = strScrubAction(a);
if (!ret.empty())
ret[0] = static_cast<char>(toupper(ret[0]));
return ret;
}
}
class IBackupFileSystem;
class KvStoreFrontend : public IOnlineKvStore, public IThread
{
public:
KvStoreFrontend(const std::string& db_path, IKvStoreBackend* backend, bool import,
const std::string& scrub_continue, const std::string& scrub_continue_position, IKvStoreBackend* backend_mirror, std::string mirror_window,
bool background_worker_manual_run, bool background_worker_multi_trans_delete, IBackupFileSystem* cachefs);
~KvStoreFrontend();
virtual IFsFile* get(const std::string& key, int64 transid, const std::string& path,
bool prioritize_read, IFsFile* tmpl_file, bool allow_error_event, bool& not_found,
int64* get_transid = nullptr) {
return get(0, key, transid, path, prioritize_read, tmpl_file, allow_error_event, not_found, get_transid);
}
IFsFile* get(int64 cd_id, const std::string& key, int64 transid, const std::string& path,
bool prioritize_read, IFsFile* tmpl_file, bool allow_error_event, bool& not_found,
int64* get_transid=nullptr);
virtual int64 get_transid(const std::string& key, int64 transid) {
return get_transid(0, key, transid);
}
int64 get_transid(int64 cd_id, const std::string& key, int64 transid);
virtual bool reset(const std::string& key, int64 transid);
virtual bool put(const std::string& key, int64 transid, IFsFile* src,
const std::string& path, unsigned int flags,
bool allow_error_event, int64& compressed_size)
{
return put(0, key, transid, 0, src, path, flags,
allow_error_event, compressed_size);
}
bool put(int64 cd_id, const std::string& key, int64 transid, int64 generation, IFsFile* src,
const std::string& path, unsigned int flags,
bool allow_error_event, int64& compressed_size);
virtual int64 new_transaction(bool allow_error_event)
{
return new_transaction(0, allow_error_event);
}
int64 new_transaction(int64 cd_id, bool allow_error_event);
virtual bool transaction_finalize(int64 transid, bool complete, bool allow_error_event)
{
return transaction_finalize(0, transid, complete, allow_error_event);
}
bool transaction_finalize(int64 cd_id, int64 transid, bool complete, bool allow_error_event);
virtual bool set_active_transactions(const std::vector<int64>& active_transactions)
{
return set_active_transactions(0, active_transactions);
}
bool set_active_transactions(int64 cd_id, const std::vector<int64>& active_transactions);
virtual bool del(const std::vector<std::string>& keys, int64 transid)
{
return del(0, keys, transid);
}
virtual bool want_put_metadata()
{
return backend->want_put_metadata();
}
bool del(int64 cd_id, const std::vector<std::string>& keys, int64 transid);
virtual size_t max_del_size();
virtual int64 generation_inc( int64 inc );
int64 get_generation(int64 cd_id);
virtual std::string get_stats();
virtual bool sync();
virtual bool sync_db();
virtual bool sync_lock(IScopedWriteLock& lock);
virtual bool is_put_sync();
std::string prefixKey(const std::string& key);
static std::string encodeKey(const std::string& key, int64 transid);
static std::string encodeKey(int64 cd_id, const std::string& key, int64 transid);
IKvStoreBackend* getBackend();
void start_scrub(ScrubAction action, const std::string& position);
std::string scrub_position();
std::string scrub_stats();
void stop_scrub();
bool is_background_worker_enabled();
bool is_background_worker_running();
void enable_background_worker(bool b);
static void start_scrub_sync_test1();
static void start_scrub_sync_test2();
bool reupload(int64 transid_start, int64 transid_stop,
IKvStoreBackend* old_backend);
std::string meminfo();
void retry_all_deletion();
int64 get_total_balance_ops();
void incr_total_del_ops();
int64 get_total_del_ops();
virtual bool has_backend_key(const std::string& key, std::string& md5sum, bool update_md5sum);
bool log_del_mirror(const std::string& fn);
std::string next_log_del_mirror_item();
void set_backend_mirror_del_log_rpos(int64 p);
int64 get_backend_mirror_del_log_rpos();
void stop_defrag();
bool set_all_mirrored(bool b);
std::string mirror_stats();
bool start_defrag(const std::string& settings);
void operator()();
virtual bool fast_write_retry();
virtual bool submit_del_cd(int64 cd_id, IHasKeyCallback* p_has_key_callback, int64 ctransid, bool& need_flush);
virtual bool submit_del(IHasKeyCallback* p_has_key_callback, int64 ctransid, bool& need_flush) override;
virtual void submit_del_post_flush() override;
private:
bool backend_del_parallel(const std::vector<IKvStoreBackend::key_next_fun_t>& key_next_funs,
const std::vector<IKvStoreBackend::locinfo_next_fun_t>& locinfo_next_funs,
bool background_queue);
void add_last_modified_column();
void add_cd_id_tasks_column();
void add_created_column();
void add_mirrored_column();
int64 get_backend_mirror_del_log_wpos();
bool update_total_num(int64 num);
class BackgroundWorker : public IThread
{
public:
BackgroundWorker(IKvStoreBackend* backend, KvStoreFrontend* frontend,
bool manual_run, bool multi_trans_delete);
void operator()();
void quit();
void set_scrub_pause(bool b)
{
IScopedLock lock(pause_mutex.get());
scrub_pause = b;
}
void set_pause(bool b)
{
IScopedLock lock(pause_mutex.get());
pause = b;
}
void set_mirror_pause(bool b)
{
IScopedLock lock(pause_mutex.get());
mirror_pause = b;
}
bool get_pause()
{
IScopedLock lock(pause_mutex.get());
return pause || scrub_pause || mirror_pause;
}
bool is_paused()
{
IScopedLock lock(pause_mutex.get());
return paused;
}
int64 get_nwork()
{
IScopedLock lock(pause_mutex.get());
return nwork;
}
bool is_runnnig()
{
return running;
}
bool is_startup_finished()
{
IScopedLock lock(pause_mutex.get());
return startup_finished;
}
bool is_manual_run()
{
return manual_run;
}
std::string meminfo();
private:
bool removeOldObjects(KvStoreDao& dao, const std::vector<int64>& trans_ids, int64 cd_id);
bool removeTransaction(KvStoreDao& dao, int64 trans_id, int64 cd_id);
volatile bool do_quit;
bool pause;
bool scrub_pause;
bool mirror_pause;
bool paused;
bool manual_run;
bool multi_trans_delete;
int64 nwork;
bool startup_finished;
std::unique_ptr<IMutex> pause_mutex;
IKvStoreBackend* backend;
KvStoreFrontend* frontend;
int64 object_collector_size;
int64 object_collector_size_uncompressed;
relaxed_atomic<bool> running;
};
class ScrubQueue
{
std::unique_ptr<IMutex> mutex;
std::unique_ptr<ICondition> cond;
public:
relaxed_atomic<size_t> scrub_errors;
relaxed_atomic<size_t> scrub_oks;
relaxed_atomic<size_t> scrub_repaired;
ScrubQueue()
: mutex(Server->createMutex()),
cond(Server->createCondition()),
do_stop(false), error(false) {
}
void add(KvStoreDao::CdIterObject item)
{
IScopedLock lock(mutex.get());
items.push(item);
cond->notify_all();
}
KvStoreDao::CdIterObject get()
{
IScopedLock lock(mutex.get());
while (!do_stop
&& items.empty())
{
cond->wait(&lock);
}
if (do_stop
&& items.empty())
{
return KvStoreDao::CdIterObject();
}
KvStoreDao::CdIterObject ret = items.front();
items.pop();
return ret;
}
void stop()
{
IScopedLock lock(mutex.get());
do_stop = true;
cond->notify_all();
}
void set_error(bool b)
{
IScopedLock lock(mutex.get());
error = b;
}
bool has_error()
{
IScopedLock lock(mutex.get());
return error;
}
void reset()
{
IScopedLock lock(mutex.get());
do_stop = false;
error = false;
scrub_oks = 0;
scrub_errors = 0;
scrub_repaired = 0;
}
private:
bool do_stop;
bool error;
std::queue<KvStoreDao::CdIterObject> items;
};
class ScrubThread : public IThread
{
public:
ScrubThread(ScrubQueue& scrub_queue,
std::vector<KvStoreDao::CdIterObject>& new_md5sums,
IMutex* new_md5sums_mutex,
bool& has_changes,
relaxed_atomic<int64>& done_size,
bool with_last_modified,
ScrubAction scrub_action,
IKvStoreBackend* backend,
KvStoreFrontend* frontend,
IKvStoreBackend* backend_mirror)
: scrub_queue(scrub_queue), new_md5sums(new_md5sums),
has_changes(has_changes), done_size(done_size),
backend(backend), scrub_action(scrub_action),
frontend(frontend), new_md5sums_mutex(new_md5sums_mutex),
with_last_modified(with_last_modified), backend_mirror(backend_mirror) {
}
void operator()();
private:
bool& has_changes;
relaxed_atomic<int64>& done_size;
bool with_last_modified;
ScrubAction scrub_action;
ScrubQueue& scrub_queue;
std::vector<KvStoreDao::CdIterObject>& new_md5sums;
IMutex* new_md5sums_mutex;
IKvStoreBackend* backend;
KvStoreFrontend* frontend;
IKvStoreBackend* backend_mirror;
};
class ScrubWorker : public IThread
{
public:
ScrubWorker(ScrubAction balance, IKvStoreBackend* backend,
IKvStoreBackend* backend_mirror,
KvStoreFrontend* frontend, BackgroundWorker& background_worker,
const std::string& position, bool has_last_modified);
void operator()();
void quit()
{
IScopedLock lock(mutex.get());
do_quit = true;
}
std::string stats()
{
return "{ \"done_size\": " + convert(done_size) + "\n,"
"\"total_size\": " + convert(total_size) + "\n,"
"\"paused\": " + convert(curr_paused ? 1 : 0) + "\n,"
"\"complete_pc\": " + convert(complete_pc) + " }\n";
}
ScrubAction get_scrub_action()
{
return scrub_action;
}
std::string get_position()
{
IScopedLock lock(mutex.get());
return position;
}
void add_deleted_objects(int64 cd_id, const std::vector<std::pair<int64, std::string> >& toadd)
{
//TODO: handle cd_id
IScopedLock lock(mutex.get());
for (auto& it : toadd) {
deleted_objects.insert(it);
}
}
std::string meminfo();
THREADPOOL_TICKET ticket;
private:
void set_allow_defrag(bool b);
bool do_quit;
bool has_last_modified;
relaxed_atomic<int> complete_pc;
relaxed_atomic<int64> done_size;
relaxed_atomic<int64> total_size;
relaxed_atomic<bool> curr_paused;
std::string position;
std::unique_ptr<IMutex> mutex;
ScrubAction scrub_action;
IKvStoreBackend* backend;
IKvStoreBackend* backend_mirror;
KvStoreFrontend* frontend;
BackgroundWorker& background_worker;
std::set<std::pair<int64, std::string> > deleted_objects;
};
class PutDbWorker : public IThread
{
public:
PutDbWorker(std::string db_path);
int64 add(int64 cd_id, int64 transid, const std::string& key, int64 generation);
void add(int64 cd_id, int64 transid, const std::string& key,
const std::string& md5sum, int64 size, int64 last_modified, int64 generation);
void flush();
void update(int64 cd_id, int64 objectid, int64 size,
const std::string& md5sum, int64 last_modified);
void quit();
void set_with_last_modified(bool b) {
with_last_modified = b;
}
void set_do_synchonous(bool b) {
do_synchronous = b;
}
void set_db_wal_file(IFsFile* f) {
db_wal_file = f;
}
void operator()();
std::string meminfo();
private:
void wait_queue(IScopedLock& lock);
void worker_abort();
std::unique_ptr<IMutex> mutex;
std::unique_ptr<ICondition> add_cond;
std::unique_ptr<ICondition> commit_cond;
enum class EType
{
Add,
Add2,
Update,
Flush
};
struct SItem
{
SItem(int64 cd_id, int64 transid, const std::string& key, int64 generation, int64* rowid)
: type(EType::Add),
cd_id(cd_id), transid(transid), key(key), rowid(rowid), generation(generation)
{}
SItem(int64 cd_id, int64 transid, const std::string& key,
const std::string& md5sum, int64 size, int64 last_modified, int64 generation)
: type(EType::Add2),
cd_id(cd_id), transid(transid), key(key), md5sum(md5sum), size(size),
last_modified(last_modified), generation(generation)
{}
SItem(int64 cd_id, int64 objectid, int64 size, const std::string& md5sum,
int64 last_modified)
: type(EType::Update),
cd_id(cd_id), objectid(objectid), transid(size), key(md5sum),
last_modified(last_modified)
{}
explicit SItem(int64* rowid)
: type(EType::Flush),
rowid(rowid)
{}
EType type;
int64 cd_id;
int64 transid;
std::string key;
int64 size;
int64 last_modified;
std::string md5sum;
int64* rowid;
int64 objectid;
int64 generation;
};
std::vector<SItem> items_a;
std::vector<SItem> items_b;
std::vector<SItem>* curr_items;
bool do_quit;
bool do_synchronous;
IFsFile* db_wal_file;
bool with_last_modified;
std::string db_path;
};
class MirrorWorker : public IThread
{
public:
MirrorWorker(KvStoreFrontend* frontend,
BackgroundWorker& background_worker)
: frontend(frontend),
background_worker(background_worker),
do_quit(false),
mutex(Server->createMutex())
{}
void operator()();
void quit()
{
IScopedLock lock(mutex.get());
do_quit = true;
}
THREADPOOL_TICKET ticket;
private:
bool do_quit;
std::unique_ptr<IMutex> mutex;
BackgroundWorker& background_worker;
KvStoreFrontend* frontend;
};
class MirrorThread : public IThread
{
public:
MirrorThread(std::vector<KvStoreDao::CdIterObject2>& objs, IPipe* rpipe,
IKvStoreBackend* backend,
IKvStoreBackend* backend_mirror,
KvStoreFrontend* frontend)
: objs(objs), rpipe(rpipe),
backend(backend),
backend_mirror(backend_mirror),
frontend(frontend),
has_error(false) {}
void operator()();
bool get_has_error() {
return has_error;
}
private:
std::vector<KvStoreDao::CdIterObject2>& objs;
IPipe* rpipe;
IKvStoreBackend* backend;
IKvStoreBackend* backend_mirror;
KvStoreFrontend* frontend;
bool has_error;
};
IKvStoreBackend* backend;
IKvStoreBackend* backend_mirror;
std::string mirror_window;
std::unique_ptr<IMutex> mirror_del_log_mutex;
std::unique_ptr<IFsFile> backend_mirror_del_log;
int64 backend_mirror_del_log_wpos;
int64 backend_mirror_del_log_rpos;
std::unique_ptr<IFsFile> db_wal_file;
IDatabase* getDatabase();
bool importFromBackend(KvStoreDao& dao);
BackgroundWorker background_worker;
THREADPOOL_TICKET background_worker_ticket;
MirrorWorker mirror_worker;
THREADPOOL_TICKET mirror_worker_ticket;
std::string empty_file_path;
std::unique_ptr<IMutex> scrub_mutex;
ScrubWorker* scrub_worker;
PutDbWorker put_db_worker;
THREADPOOL_TICKET put_db_worker_ticket;
bool with_prefix;
std::unique_ptr<IMutex> gen_mutex;
int64 current_generation;
int64 last_persisted_generation;
int64 last_update_generation;
std::unique_ptr<IMutex> unsynced_keys_mutex;
struct SUnsyncedKey
{
SUnsyncedKey()
: transid(0) { }
SUnsyncedKey(int64 transid,
std::string md5sum)
: transid(transid),
md5sum(md5sum) { }
int64 transid;
std::string md5sum;
};
std::map<std::pair<int64, std::string>, SUnsyncedKey> unsynced_keys_a;
std::map<std::pair<int64, std::string>, SUnsyncedKey> unsynced_keys_b;
std::map<std::pair<int64, std::string>, SUnsyncedKey>* curr_unsynced_keys;
std::map<std::pair<int64, std::string>, SUnsyncedKey>* other_unsynced_keys;
std::unique_ptr<ISharedMutex> put_shared_mutex;
static bool scrub_sync_test1;
static bool scrub_sync_test2;
bool has_last_modified;
relaxed_atomic<int64> total_balance_ops;
relaxed_atomic<int64> total_del_ops;
std::string db_path;
relaxed_atomic<int64> objects_total_size;
relaxed_atomic<int64> objects_total_num;
relaxed_atomic<bool> objects_init_complete;
THREADPOOL_TICKET objects_init_ticket;
int64 task_delay;
// Inherited via IOnlineKvStore
virtual int64 get_uploaded_bytes() override;
virtual int64 get_downloaded_bytes() override;
relaxed_atomic<int> mirror_state;
relaxed_atomic<int64> mirror_curr_pos;
relaxed_atomic<int64> mirror_curr_total;
relaxed_atomic<int64> mirror_items;
bool allow_import;
IBackupFileSystem* cachefs;
};

View File

@ -0,0 +1,146 @@
/*************************************************************************
* UrBackup - Client/Server backup system
* Copyright (C) 2021 Martin Raiber
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
**************************************************************************/
#ifdef WITH_LZMA
#include "LzmaCompressor.h"
LzmaCompressor::LzmaCompressor()
{
memset(&strm, 0, sizeof(lzma_stream));
lzma_ret ret = lzma_easy_encoder(&strm, 5, LZMA_CHECK_CRC32);
if (ret != LZMA_OK)
{
Server->Log("Error initializing LZMA", LL_ERROR);
throw std::runtime_error("Error initializing LZMA");
}
}
LzmaCompressor::~LzmaCompressor()
{
lzma_end(&strm);
}
void LzmaCompressor::setOut(char * next_out, size_t avail_out)
{
strm.next_out = reinterpret_cast<uint8_t*>(next_out);
strm.avail_out = avail_out;
}
void LzmaCompressor::setIn(char * next_in, size_t avail_in)
{
strm.next_in = reinterpret_cast<uint8_t*>(next_in);
strm.avail_in = avail_in;
}
size_t LzmaCompressor::getAvailOut()
{
return strm.avail_out;
}
size_t LzmaCompressor::getAvailIn()
{
return strm.avail_in;
}
CompressResult LzmaCompressor::compress(bool finish, int& code)
{
lzma_ret ret = lzma_code(&strm, finish ? LZMA_FINISH : LZMA_RUN);
code = ret;
if (ret == LZMA_OK)
{
return CompressResult_Ok;
}
else if(ret== LZMA_STREAM_END)
{
return CompressResult_End;
}
else
{
return CompressResult_Other;
}
}
unsigned int LzmaCompressor::getId()
{
return CompressionLzma5;
}
LzmaDecompressor::LzmaDecompressor()
{
memset(&strm, 0, sizeof(lzma_stream));
lzma_ret ret = lzma_stream_decoder(&strm, UINT64_MAX, 0);
if (ret != LZMA_OK)
{
Server->Log("Error initializing LZMA", LL_ERROR);
throw std::runtime_error("Error initializing LZMA");
}
}
LzmaDecompressor::~LzmaDecompressor()
{
lzma_end(&strm);
}
void LzmaDecompressor::setOut(char * next_out, size_t avail_out)
{
strm.next_out = reinterpret_cast<uint8_t*>(next_out);
strm.avail_out = avail_out;
}
void LzmaDecompressor::setIn(char * next_in, size_t avail_in)
{
strm.next_in = reinterpret_cast<uint8_t*>(next_in);
strm.avail_in = avail_in;
}
size_t LzmaDecompressor::getAvailOut()
{
return strm.avail_out;
}
size_t LzmaDecompressor::getAvailIn()
{
return strm.avail_in;
}
DecompressResult LzmaDecompressor::decompress(int & code)
{
lzma_ret ret = lzma_code(&strm, LZMA_RUN);
code = ret;
if (ret == LZMA_OK)
{
return DecompressResult_Ok;
}
else if (ret == LZMA_STREAM_END)
{
return DecompressResult_End;
}
else
{
return DecompressResult_Other;
}
}
#endif //WITH_LZMA

View File

@ -0,0 +1,40 @@
#pragma once
#ifdef WITH_LZMA
#include <lzma.h>
#include "CompressEncrypt.h"
class LzmaCompressor : public ICompressor
{
public:
LzmaCompressor();
~LzmaCompressor();
// Inherited via ICompressor
virtual void setOut(char * next_out, size_t avail_out);
virtual void setIn(char * next_in, size_t avail_in);
virtual size_t getAvailOut();
virtual size_t getAvailIn();
virtual CompressResult compress(bool finish, int& code);
virtual unsigned int getId();
private:
lzma_stream strm;
};
class LzmaDecompressor : public IDecompressor
{
public:
LzmaDecompressor();
~LzmaDecompressor();
// Inherited via ICompressor
virtual void setOut(char * next_out, size_t avail_out);
virtual void setIn(char * next_in, size_t avail_in);
virtual size_t getAvailOut();
virtual size_t getAvailIn();
virtual DecompressResult decompress(int& code);
private:
lzma_stream strm;
};
#endif //WITH_LZMA

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,801 @@
#pragma once
#include <list>
#include <atomic>
#include "../common/lrucache.h"
#include "../Interface/Thread.h"
#include "../Interface/Types.h"
#include "../Interface/Mutex.h"
#include "../Interface/BackupFileSystem.h"
#include <set>
#include "../Interface/Condition.h"
#include <memory>
#include <deque>
#include <queue>
#include <mutex>
#include <thread>
#include <condition_variable>
#include "IOnlineKvStore.h"
#include "../urbackupcommon/os_functions.h"
#ifdef HAS_ASYNC
#include "fuse_io_context.h"
#endif
#include "../common/relaxed_atomic.h"
class IFsFile;
class ICompressEncryptFactory;
#ifndef NDEBUG
#define DIRTY_ITEM_CHECK
#endif
#ifdef DIRTY_ITEM_CHECK
#define DIRTY_ITEM(x) x
#else
#define DIRTY_ITEM(x)
#endif
#ifndef HAS_ASYNC
class fuse_io_context
{
public:
static bool is_sync_thread() {
return true;
}
};
#endif
namespace
{
class SubmitWorker;
class ReadOnlyFileWrapper;
class Bitmap;
enum ESubmissionAction
{
SubmissionAction_Working_Evict,
SubmissionAction_Working_Dirty,
SubmissionAction_Working_Delete,
SubmissionAction_Working_Compress,
SubmissionAction_Evict,
SubmissionAction_Dirty,
SubmissionAction_Delete,
SubmissionAction_Compress
};
struct SSubmissionItem
{
SSubmissionItem()
: finish(true)
{}
std::string key;
std::vector<std::string> keys;
ESubmissionAction action;
int64 transid;
int64 size;
bool compressed;
bool finish;
};
struct SFdKey
{
SFdKey(IFsFile* fd, int64 size, bool read_only)
: fd(fd), size(size), read_only(read_only), refcount(1)
{
}
SFdKey()
: fd(NULL), size(0), read_only(true), refcount(1)
{
}
IFsFile* fd;
int64 size;
bool read_only;
int refcount;
bool operator<(const SFdKey& other) const
{
if(other.fd==fd)
{
return read_only<other.read_only;
}
else
{
return fd < other.fd;
}
}
bool operator==(const SFdKey& other) const
{
return other.fd==fd && other.read_only==read_only;
}
};
struct SubmittedItem
{
std::string key;
int64 submittime;
};
struct SMemFile
{
SMemFile(IFsFile* file, const std::string& key, int64 size)
: file(file), size(size), compsize(-1), evicted(false), cow(false), key(key) {}
SMemFile()
: file(nullptr), size(0), compsize(-1), evicted(false), cow(false) {}
std::shared_ptr<IFsFile> file;
std::shared_ptr<IFsFile> old_file;
int64 size;
int64 compsize;
bool evicted;
bool cow;
std::string key;
};
}
#ifndef NDEBUG
class test_mutex : public std::mutex
{
public:
test_mutex() noexcept
: std::mutex() {}
test_mutex(const test_mutex&) = delete;
test_mutex& operator=(const test_mutex&) = delete;
void lock() {
std::mutex::lock();
if (lid != std::thread::id())
abort();
lid = std::this_thread::get_id();
}
void unlock() {
if (lid != std::this_thread::get_id())
abort();
lid = std::thread::id();
std::mutex::unlock();
}
bool has_lock() {
return lid == std::this_thread::get_id();
}
private:
std::thread::id lid = std::thread::id();
};
using cache_mutex_t = test_mutex;
#else
using cache_mutex_t = std::recursive_mutex;
#endif
class TransactionalKvStore : public IThread
{
class SubmitWorker;
class RetrievalOperation;
class RetrievalOperationNoLock;
class RetrievalOperationUnlockOnly;
public:
struct SCacheVal
{
SCacheVal()
: dirty(false), chances(0)
{}
bool dirty : 1;
unsigned char chances : 7;
};
class INumSecondChancesCallback
{
public:
virtual unsigned int get_num_second_chances(const std::string& key) = 0;
virtual bool is_metadata(const std::string& key) = 0;
};
TransactionalKvStore(IBackupFileSystem* cachefs, int64 min_cachesize, int64 min_free_size, int64 critical_free_size,
int64 throttle_free_size, int64 min_metadata_cache_free, float comp_percent, int64 comp_start_limit,
IOnlineKvStore* online_kv_store, const std::string& encryption_key, ICompressEncryptFactory* compress_encrypt_factory, bool verify_cache,
float cpu_multiplier, size_t no_compress_mult, bool with_prev_link, bool allow_evict, bool with_submitted_files,
float resubmit_compressed_ratio, int64 max_memfile_size, std::string memcache_path,
float memory_usage_factor, bool only_memfiles, unsigned int background_comp_method);
~TransactionalKvStore();
enum class BitmapInfo
{
Present,
NotPresent,
Unknown
};
class Flag
{
public:
static constexpr unsigned int disable_fd_cache = 1;
static constexpr unsigned int disable_throttling = 2;
static constexpr unsigned int prioritize_read = 4;
static constexpr unsigned int read_random = 8;
static constexpr unsigned int read_only = 16;
static constexpr unsigned int preload_once = 32;
static constexpr unsigned int disable_memfiles = 64;
};
IFsFile* get(const std::string& key,
BitmapInfo bitmap_present, unsigned int flags, int64 size_hint,
int preload_tag=0);
void release(const std::string& key);
#ifdef HAS_ASYNC
fuse_io_context::io_uring_task<void> release_async(fuse_io_context& io, const std::string& key);
#endif
bool del(const std::string& key);
#ifdef HAS_ASYNC
fuse_io_context::io_uring_task<bool> del_async(fuse_io_context& io, const std::string key);
#endif
bool set_second_chances(const std::string& key, unsigned int chances);
bool has_preload_once(const std::string& key);
bool has_item_cached(const std::string& key);
void remove_preload_items(int preload_tag);
void dirty_all();
bool checkpoint(bool do_submit, size_t checkpoint_retry_n);
void operator()();
void stop();
int64 get_dirty_bytes();
int64 get_submitted_bytes();
int64 get_total_submitted_bytes();
std::map<int64, size_t> get_num_dirty_items();
std::map<int64, size_t> get_num_memfile_items();
int64 get_cache_size();
int64 get_comp_bytes();
void reset();
bool is_congested();
bool is_congested_nolock();
bool is_congested_async();
int64 get_memfile_bytes();
int64 get_submitted_memfile_bytes();
bool is_memfile_complete(int64 ttransid);
std::string meminfo();
void shrink_mem();
int64 get_total_hits();
int64 get_total_hits_async();
int64 get_total_memory_hits();
int64 get_total_memory_hits_async();
int64 get_total_cache_miss_backend();
int64 get_total_cache_miss_decompress();
int64 get_total_dirty_ops();
int64 get_total_put_ops();
int64 get_total_compress_ops();
void disable_compression(int64 disablems);
void set_num_second_chances_callback(INumSecondChancesCallback* cb);
int64 get_transid();
int64 get_basetransid();
void set_max_cachesize(int64 ns);
int64 cache_total_space();
void set_disable_read_memfiles(bool b);
void set_disable_write_memfiles(bool b);
#ifdef HAS_ASYNC
fuse_io_context::io_uring_task<IFsFile*> get_async(fuse_io_context& io, const std::string& key,
BitmapInfo bitmap_present, unsigned int flags, int64 size_hint,
int preload_tag=0);
#endif
void check_mutex_not_held();
private:
IFsFile* get_internal(const std::string& key,
BitmapInfo bitmap_present, unsigned int flags, int64 size_hint,
int preload_tag);
IFsFile* get_retrieve(const std::string& key, BitmapInfo bitmap_present, unsigned int flags,
int64 size_hint, SFdKey* res, bool& cache_fd, std::unique_lock<cache_mutex_t>& cache_lock);
int64 set_active_transactions(std::unique_lock<cache_mutex_t>& cache_lock, bool continue_incomplete);
void cleanup(bool init);
void read_keys(std::unique_lock<cache_mutex_t>& cache_lock, const std::string& tpath, bool verify);
void read_missing();
void remove_transaction(int64 transid);
std::string keypath2(const std::string& key, int64 transaction_id);
std::string transpath2();
std::string basepath2();
std::string hex(const std::string& key);
std::string hexpath(const std::string& key);
bool evict_one(std::unique_lock<cache_mutex_t>& cache_lock, bool break_on_skip, bool only_non_dirty,
std::list<std::pair<std::string const *, SCacheVal> >::iterator& evict_it,
common::lrucache<std::string, SCacheVal>& target_cache, bool use_chances,
int64& freed_space,
bool& run_del_items, std::vector<std::list<std::pair<std::string const *, SCacheVal> >::iterator>& move_front,
bool& used_chance);
void evict_move_front(common::lrucache<std::string, SCacheVal>& target_cache,
std::vector<std::list<std::pair<std::string const *, SCacheVal> >::iterator>& move_front);
void evict_item(const std::string& key, bool dirty,
common::lrucache<std::string, SCacheVal>& target_cache,
std::list<std::pair<std::string const *, SCacheVal> >::iterator* evict_it,
std::unique_lock<cache_mutex_t>& cache_lock, const std::string& from, int64& freed_space);
bool evict_memfiles(std::unique_lock<cache_mutex_t>& cache_lock, bool evict_dirty);
void check_deleted(int64 transid, const std::string& key, bool comp);
bool compress_one(std::unique_lock<cache_mutex_t>& cache_lock,
std::list<std::pair<std::string const *, SCacheVal> >::iterator& compress_it);
std::list<SSubmissionItem>::iterator next_submission_item(bool no_compress, bool prefer_non_delete, bool prefer_mem, std::string& path, bool& p_do_stop, SMemFile*& memf);
std::list<SSubmissionItem>::iterator no_submission_item();
bool item_submitted(std::list<SSubmissionItem>::iterator it, bool can_delete, bool is_memf);
bool item_compressed(std::list<SSubmissionItem>::iterator it, bool compression_error, int64 size_diff, int64 add_comp_bytes, bool is_memf);
enum class DeleteImm
{
None,
NoUnlock,
Unlock
};
void delete_item(fuse_io_context* io, const std::string& key, bool compressed_item,
std::unique_lock<cache_mutex_t>& cache_lock,
int64 force_delete_transid=0, int64 skip_transid=0, DeleteImm del_imm = DeleteImm::None,
int64 delete_only=0, bool rm_submitted=false);
void run_del_file_queue();
#ifdef HAS_ASYNC
fuse_io_context::io_uring_task<void> run_del_file_queue_async(fuse_io_context& io);
#endif
void wait_for_del_file(const std::string& fn);
bool read_dirty_items(std::unique_lock<cache_mutex_t>& cache_lock, int64 transaction_id, int64 attibute_to_trans_id);
bool read_submitted_evicted_files(const std::string& fn, std::set<std::string>& sub_evict);
bool read_from_dirty_file(std::unique_lock<cache_mutex_t>& cache_lock, const std::string& fn, int64 transaction_id, bool do_submit, int64 nosubmit_transid);
bool write_to_dirty_file(std::unique_lock<cache_mutex_t>& cache_lock, const std::string& fn, bool do_submit, int64 new_trans,
size_t& num_memf_dirty, std::map<std::string, size_t>& memf_dirty_items,
std::map<std::string, int64>& memf_dirty_items_size);
bool read_from_deleted_file(const std::string& fn, int64 transaction_id, bool do_submit);
bool write_to_deleted_file(const std::string& fn, bool do_submit);
void submit_dummy(int64 transaction_id);
void clean_snapshot( int64 new_trans );
void wait_for_all_retrievals(std::unique_lock<cache_mutex_t>& lock);
bool wait_for_retrieval(std::unique_lock<cache_mutex_t>& lock, const std::string& key);
bool wait_for_retrieval_poll(std::unique_lock<cache_mutex_t>& lock, const std::string& key);
struct RetrievalRes
{
RetrievalRes()
: waited(false)
{}
std::unique_lock<cache_mutex_t> lock;
bool waited;
};
#ifdef HAS_ASYNC
fuse_io_context::io_uring_task<RetrievalRes> wait_for_retrieval_async(fuse_io_context& io, std::unique_lock<cache_mutex_t> lock, const std::string key);
#endif
void initiate_retrieval(const std::string& key);
void finish_retrieval(const std::string& key);
#ifdef HAS_ASYNC
void finish_retrieval_async(fuse_io_context& io, std::unique_lock<cache_mutex_t> lock, const std::string& key);
#endif
bool compress_item(const std::string& key, int64 transaction_id, IFsFile* src, int64& size_diff, int64& dst_size, bool sync);
bool decompress_item(const std::string& key, int64 transaction_id, IFsFile* tmpl_file, int64& size_diff, int64& src_size, bool sync);
void remove_compression_evicition_submissions(std::unique_lock<cache_mutex_t>& cache_lock);
void remove_curr_trans_submission(std::unique_lock<cache_mutex_t>& cache_lock, std::list<SSubmissionItem>::iterator it);
void wait_for_compressions_evictions(std::unique_lock<cache_mutex_t>& lock);
void addDirtyItem(int64 transid, std::string key, bool with_stats=true);
void removeDirtyItem(int64 transid, std::string key);
void add_dirty_bytes(int64 transid, std::string key, int64 b);
void rm_dirty_bytes(int64 transid, std::string key, int64 b, bool rm, bool change_size=true);
void add_cachesize(int64 toadd);
void sub_cachesize(int64 tosub);
bool is_sync_wait_item(const std::string& key);
bool is_sync_wait_item(const std::string& key, int64 transid);
void check_submission_items();
void update_transactions();
void drop_cache(IFsFile* fd);
#ifdef HAS_ASYNC
fuse_io_context::io_uring_task<void> drop_cache_async(fuse_io_context& io, IFsFile* fd);
#endif
void set_read_random(IFsFile* fd);
#ifdef HAS_ASYNC
fuse_io_context::io_uring_task<void> set_read_random_async(fuse_io_context& io, IFsFile* fd);
#endif
bool add_submitted_item(int64 transid, std::string tkey, std::unique_ptr<IFile>* fd_cache);
bool add_evicted_item(int64 transid, std::string tkey);
bool add_item(std::string fn, int64 transid, std::string tkey, std::unique_ptr<IFile>* fd_cache);
IFsFile* get_mem_file(const std::string& key, int64 size_hint, bool for_read);
bool has_memfile_stat(const std::string& key);
void add_memfile_stat(const std::string& key);
bool rm_mem_file(fuse_io_context* io, int64 transid, const std::string& key, bool rm_submitted);
void rm_submission_item(std::map<std::pair<int64, std::string>, std::list<SSubmissionItem>::iterator >::iterator it);
bool cow_mem_file(SMemFile* memf, bool with_old_file);
#ifdef HAS_ASYNC
fuse_io_context::io_uring_task<bool> cow_mem_file_async(fuse_io_context& io, SMemFile* memf, bool with_old_file);
#endif
void remove_missing(const std::string& key);
int64 get_compsize(const std::string& key, int64 transid);
void only_memfiles_throttle(const std::string& key, std::unique_lock<cache_mutex_t>& lock);
#ifdef HAS_ASYNC
fuse_io_context::io_uring_task<void> only_memfiles_throttle_async(fuse_io_context& io, const std::string key);
#endif
std::string readCacheFile(const std::string& fn);
bool cacheFileExists(const std::string& fn);
bool writeToCacheFile(const std::string& str, const std::string& fn);
TransactionalKvStore::SCacheVal cache_val(const std::string& key, bool dirty);
TransactionalKvStore::SCacheVal cache_val_nc(bool dirty);
std::list<SSubmissionItem>::iterator submission_queue_add(SSubmissionItem& item, bool memfile);
std::list<SSubmissionItem>::iterator submission_queue_insert(SSubmissionItem& item, bool memfile, std::list<SSubmissionItem>::iterator it);
void submission_queue_rm(std::list<SSubmissionItem>::iterator it);
int64 cache_free_space();
class RegularSubmitBundleThread : public IThread
{
TransactionalKvStore* kv_store;
bool do_quit;
std::condition_variable_any cond;
void regular_submit_bundle(std::unique_lock<cache_mutex_t>& cache_lock);
public:
RegularSubmitBundleThread(TransactionalKvStore* kv_store)
:kv_store(kv_store), do_quit(false) {}
void operator()();
void quit();
};
class MetadataUpdateThread : public IThread
{
public:
TransactionalKvStore* kv_store;
bool do_quit;
std::condition_variable_any cond;
MetadataUpdateThread(TransactionalKvStore* kv_store)
:kv_store(kv_store), do_quit(false) {}
void operator()();
void quit();
};
class ThrottleThread : public IThread
{
public:
TransactionalKvStore* kv_store;
bool do_quit;
std::condition_variable_any cond;
ThrottleThread(TransactionalKvStore* kv_store)
:kv_store(kv_store), do_quit(false) {}
void operator()();
void quit();
};
class MemfdDelThread : public IThread
{
std::mutex mutex;
std::condition_variable cond;
std::vector<std::shared_ptr<IFsFile> > del_fds;
bool do_quit = false;
public:
MemfdDelThread() {
}
void quit() {
std::scoped_lock lock(mutex);
do_quit = true;
cond.notify_all();
}
void del(std::shared_ptr<IFsFile>&& fd)
{
std::unique_lock lock(mutex);
while (del_fds.size() > 1000)
{
lock.unlock();
Server->wait(100);
lock.lock();
}
del_fds.push_back(fd);
cond.notify_all();
}
void operator()();
};
#ifdef HAS_ASYNC
struct AwaiterCoList
{
AwaiterCoList* next;
std::coroutine_handle<> awaiter;
};
AwaiterCoList* retrieval_waiters_head = nullptr;
struct RetrievalAwaiter
{
RetrievalAwaiter(TransactionalKvStore& kv_store)
: kv_store(kv_store) {}
RetrievalAwaiter(RetrievalAwaiter const&) = delete;
RetrievalAwaiter(RetrievalAwaiter&& other) = delete;
RetrievalAwaiter& operator=(RetrievalAwaiter&&) = delete;
RetrievalAwaiter& operator=(RetrievalAwaiter const&) = delete;
bool await_ready() const noexcept
{
return false;
}
void await_suspend(std::coroutine_handle<> p_awaiter) noexcept
{
kv_store.check_mutex_not_held();
awaiter.awaiter = p_awaiter;
awaiter.next = kv_store.retrieval_waiters_head;
kv_store.retrieval_waiters_head = &awaiter;
}
void await_resume() const noexcept
{
}
private:
AwaiterCoList awaiter;
TransactionalKvStore& kv_store;
};
void resume_retrieval_awaiters()
{
AwaiterCoList* curr = retrieval_waiters_head;
retrieval_waiters_head = nullptr;
while (curr != nullptr)
{
AwaiterCoList* next = curr->next;
curr->awaiter.resume();
curr = next;
}
}
#endif
common::lrucache<std::string, SCacheVal> lru_cache;
std::map<std::string, SFdKey> open_files;
std::map<IFsFile*, ReadOnlyFileWrapper*> read_only_open_files;
std::map<std::string, int> preload_once_items;
std::map<std::string, int64> preload_once_delayed_removal;
std::list<SSubmissionItem> submission_queue;
std::list<SSubmissionItem>::iterator submission_queue_memfile_first;
std::map<std::pair<int64, std::string>, std::list<SSubmissionItem>::iterator > submission_items;
std::map<int64, size_t> num_dirty_items;
#ifdef DIRTY_ITEM_CHECK
std::map<int64, std::map<std::string, size_t> > dirty_items;
std::map<int64, std::map<std::string, int64> > dirty_items_size;
#endif
std::map<int64, size_t> num_delete_items;
common::lrucache<std::string, SFdKey> fd_cache;
cache_mutex_t cache_mutex;
std::recursive_mutex submission_mutex;
std::recursive_mutex dirty_item_mutex;
std::condition_variable_any evict_cond;
std::set<std::string> queued_dels;
std::map<std::string, size_t> in_retrieval;
std::condition_variable_any retrieval_cond;
common::lrucache<std::string, SCacheVal> compressed_items;
std::set<std::string> dirty_evicted_items;
std::map<int64, std::set<std::string> > nosubmit_dirty_items;
std::set<std::string> nosubmit_untouched_items;
std::set<std::string> del_file_queue;
std::string prio_del_file;
std::unique_ptr<ICondition> prio_del_file_cond;
std::unique_ptr<IMutex> del_file_mutex;
std::unique_ptr<IMutex> del_file_single_mutex;
std::vector<SFile> transactions;
std::unique_ptr<IMutex> evicted_mutex;
std::recursive_mutex memfiles_mutex;
common::lrucache<std::pair<int64, std::string>, SMemFile> memfiles;
std::vector<std::pair<int64, std::unique_ptr<Bitmap> > > memfile_stat_bitmaps;
std::map<int64, size_t> num_mem_files;
relaxed_atomic<int64> memfile_size;
relaxed_atomic<int64> submitted_memfile_size;
int64 submit_bundle_starttime;
std::unique_ptr<IMutex> submit_bundle_item_mutex;
std::vector<std::pair<SSubmissionItem, bool> > submit_bundle;
std::set<std::pair<std::string, int64> > submit_bundle_items_a;
std::set<std::pair<std::string, int64> > submit_bundle_items_b;
std::set<std::pair<std::string, int64> >* curr_submit_bundle_items;
std::set<std::pair<std::string, int64> >* other_submit_bundle_items;
std::set<std::string> missing_items;
std::vector<THREADPOOL_TICKET> threads;
IBackupFileSystem* cachefs;
relaxed_atomic<int64> cachesize;
relaxed_atomic<int64> dirty_bytes;
relaxed_atomic<int64> comp_bytes;
relaxed_atomic<int64> submitted_bytes;
relaxed_atomic<int64> total_submitted_bytes;
int64 total_hits;
int64 total_memory_hits;
relaxed_atomic<int64> total_dirty_ops;
relaxed_atomic<int64> total_cache_miss_backend;
relaxed_atomic<int64> total_cache_miss_decompress;
relaxed_atomic<int64> total_put_ops;
relaxed_atomic<int64> total_compress_ops;
relaxed_atomic<int64> max_cachesize;
int64 transid;
int64 basetrans;
int64 min_cachesize;
int64 min_free_size;
int64 throttle_free_size;
int64 critical_free_size;
float comp_percent;
int64 comp_start_limit;
bool curr_submit_compress_evict;
float resubmit_compressed_ratio;
int64 max_memfile_size;
size_t submitted_memfiles;
size_t remaining_gets;
bool has_new_remaining_gets = false;
size_t new_remaining_gets = 0;
size_t unthrottled_gets;
double unthrottled_gets_avg;
bool do_evict;
int64 do_evict_starttime;
bool do_stop;
size_t evict_queue_depth;
size_t compress_queue_depth;
relaxed_atomic<int64> metadata_cache_free;
int64 min_metadata_cache_free;
IOnlineKvStore* online_kv_store;
std::string encryption_key;
ICompressEncryptFactory* compress_encrypt_factory;
RegularSubmitBundleThread regular_submit_bundle_thread;
ThrottleThread throttle_thread;
MetadataUpdateThread metadata_update_thread;
MemfdDelThread memfd_del_thread;
bool with_sync_wait;
bool with_prev_link;
bool allow_evict;
bool with_submitted_files;
size_t fd_cache_size;
bool evict_non_dirty_memfiles;
int64 compression_starttime;
INumSecondChancesCallback* num_second_chances_cb;
bool only_memfiles;
bool disable_read_memfiles;
bool disable_write_memfiles;
unsigned int background_comp_method;
size_t retrieval_waiters_async;
size_t retrieval_waiters_sync;
std::string memcache_path;
};

View File

@ -0,0 +1,196 @@
<?xml version="1.0" encoding="utf-8"?>
<Project DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<ItemGroup Label="ProjectConfigurations">
<ProjectConfiguration Include="Debug|Win32">
<Configuration>Debug</Configuration>
<Platform>Win32</Platform>
</ProjectConfiguration>
<ProjectConfiguration Include="Release|Win32">
<Configuration>Release</Configuration>
<Platform>Win32</Platform>
</ProjectConfiguration>
<ProjectConfiguration Include="Debug|x64">
<Configuration>Debug</Configuration>
<Platform>x64</Platform>
</ProjectConfiguration>
<ProjectConfiguration Include="Release|x64">
<Configuration>Release</Configuration>
<Platform>x64</Platform>
</ProjectConfiguration>
</ItemGroup>
<PropertyGroup Label="Globals">
<VCProjectVersion>16.0</VCProjectVersion>
<Keyword>Win32Proj</Keyword>
<ProjectGuid>{3b050c2b-be6e-4837-a81c-b07abf8f9e18}</ProjectGuid>
<RootNamespace>clouddrive</RootNamespace>
<WindowsTargetPlatformVersion>10.0</WindowsTargetPlatformVersion>
</PropertyGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.Default.props" />
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'" Label="Configuration">
<ConfigurationType>DynamicLibrary</ConfigurationType>
<UseDebugLibraries>true</UseDebugLibraries>
<PlatformToolset>v142</PlatformToolset>
<CharacterSet>Unicode</CharacterSet>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'" Label="Configuration">
<ConfigurationType>DynamicLibrary</ConfigurationType>
<UseDebugLibraries>false</UseDebugLibraries>
<PlatformToolset>v142</PlatformToolset>
<WholeProgramOptimization>true</WholeProgramOptimization>
<CharacterSet>Unicode</CharacterSet>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'" Label="Configuration">
<ConfigurationType>DynamicLibrary</ConfigurationType>
<UseDebugLibraries>true</UseDebugLibraries>
<PlatformToolset>v142</PlatformToolset>
<CharacterSet>Unicode</CharacterSet>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'" Label="Configuration">
<ConfigurationType>DynamicLibrary</ConfigurationType>
<UseDebugLibraries>false</UseDebugLibraries>
<PlatformToolset>v142</PlatformToolset>
<WholeProgramOptimization>true</WholeProgramOptimization>
<CharacterSet>Unicode</CharacterSet>
</PropertyGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.props" />
<ImportGroup Label="ExtensionSettings">
</ImportGroup>
<ImportGroup Label="Shared">
</ImportGroup>
<ImportGroup Label="PropertySheets" Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
<Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
</ImportGroup>
<ImportGroup Label="PropertySheets" Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
<Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
</ImportGroup>
<ImportGroup Label="PropertySheets" Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">
<Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
</ImportGroup>
<ImportGroup Label="PropertySheets" Condition="'$(Configuration)|$(Platform)'=='Release|x64'">
<Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
</ImportGroup>
<PropertyGroup Label="UserMacros" />
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
<LinkIncremental>true</LinkIncremental>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
<LinkIncremental>false</LinkIncremental>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">
<LinkIncremental>true</LinkIncremental>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'">
<LinkIncremental>false</LinkIncremental>
</PropertyGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
<ClCompile>
<WarningLevel>Level3</WarningLevel>
<SDLCheck>true</SDLCheck>
<PreprocessorDefinitions>WIN32;_DEBUG;CLOUDDRIVE_EXPORTS;_WINDOWS;_USRDLL;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<ConformanceMode>true</ConformanceMode>
<PrecompiledHeader>Use</PrecompiledHeader>
<PrecompiledHeaderFile>pch.h</PrecompiledHeaderFile>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>
<GenerateDebugInformation>true</GenerateDebugInformation>
<EnableUAC>false</EnableUAC>
</Link>
</ItemDefinitionGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
<ClCompile>
<WarningLevel>Level3</WarningLevel>
<FunctionLevelLinking>true</FunctionLevelLinking>
<IntrinsicFunctions>true</IntrinsicFunctions>
<SDLCheck>true</SDLCheck>
<PreprocessorDefinitions>WIN32;NDEBUG;CLOUDDRIVE_EXPORTS;_WINDOWS;_USRDLL;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<ConformanceMode>true</ConformanceMode>
<PrecompiledHeader>Use</PrecompiledHeader>
<PrecompiledHeaderFile>pch.h</PrecompiledHeaderFile>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>
<EnableCOMDATFolding>true</EnableCOMDATFolding>
<OptimizeReferences>true</OptimizeReferences>
<GenerateDebugInformation>true</GenerateDebugInformation>
<EnableUAC>false</EnableUAC>
</Link>
</ItemDefinitionGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">
<ClCompile>
<WarningLevel>Level3</WarningLevel>
<SDLCheck>true</SDLCheck>
<PreprocessorDefinitions>_DEBUG;CLOUDDRIVE_EXPORTS;_WINDOWS;_USRDLL;_SILENCE_CXX17_OLD_ALLOCATOR_MEMBERS_DEPRECATION_WARNING;_SILENCE_CXX17_ITERATOR_BASE_CLASS_DEPRECATION_WARNING;OS_FUNC_NO_NET;USE_IMPORT_EXPORT;USE_WINDOWS_DLL_SEMANTICS;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<ConformanceMode>true</ConformanceMode>
<PrecompiledHeader>NotUsing</PrecompiledHeader>
<PrecompiledHeaderFile>pch.h</PrecompiledHeaderFile>
<AdditionalOptions>/std:c++latest %(AdditionalOptions)</AdditionalOptions>
<AdditionalIncludeDirectories>C:\dev\aws-sdk-cpp-1.8.186\aws-cpp-sdk-s3\include;C:\dev\aws-sdk-cpp-1.8.186\aws-cpp-sdk-core\include;$(ZlibIncludeDir);$(SolutionDir)/deps/include/zlib;$(SolutionDir)/deps/include/zstd;$(ZstdIncludeDir);$(CryptoppIncludeDir);$(SolutionDir)/deps/include/cryptopp;</AdditionalIncludeDirectories>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>
<GenerateDebugInformation>true</GenerateDebugInformation>
<EnableUAC>false</EnableUAC>
<AdditionalDependencies>aws-cpp-sdk-core.lib;aws-cpp-sdk-s3.lib;cryptlibd_x86_64.lib;zlibd_x86_64.lib;%(AdditionalDependencies);libzstd_x86_64.lib</AdditionalDependencies>
<AdditionalLibraryDirectories>$(ZlibLibDir);$(SolutionDir)/deps/libs;$(ZstdLibDir);$(CryptoppLibDir);C:\dev\aws-sdk-cpp-1.8.186-build\aws-cpp-sdk-s3\Debug;C:\dev\aws-sdk-cpp-1.8.186-build\aws-cpp-sdk-core\Debug</AdditionalLibraryDirectories>
</Link>
</ItemDefinitionGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'">
<ClCompile>
<WarningLevel>Level3</WarningLevel>
<FunctionLevelLinking>true</FunctionLevelLinking>
<IntrinsicFunctions>true</IntrinsicFunctions>
<SDLCheck>true</SDLCheck>
<PreprocessorDefinitions>NDEBUG;CLOUDDRIVE_EXPORTS;_WINDOWS;_USRDLL;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<ConformanceMode>true</ConformanceMode>
<PrecompiledHeader>Use</PrecompiledHeader>
<PrecompiledHeaderFile>pch.h</PrecompiledHeaderFile>
</ClCompile>
<Link>
<SubSystem>Windows</SubSystem>
<EnableCOMDATFolding>true</EnableCOMDATFolding>
<OptimizeReferences>true</OptimizeReferences>
<GenerateDebugInformation>true</GenerateDebugInformation>
<EnableUAC>false</EnableUAC>
</Link>
</ItemDefinitionGroup>
<ItemGroup>
<ClCompile Include="..\common\data.cpp" />
<ClCompile Include="..\md5.cpp" />
<ClCompile Include="..\stringtools.cpp" />
<ClCompile Include="..\urbackupcommon\json.cpp" />
<ClCompile Include="..\urbackupcommon\os_functions_win.cpp" />
<ClCompile Include="..\urbackupcommon\WalCheckpointThread.cpp" />
<ClCompile Include="CdZlibCompressor.cpp" />
<ClCompile Include="CdZstdCompressor.cpp" />
<ClCompile Include="ClouddriveFactory.cpp" />
<ClCompile Include="CloudFile.cpp" />
<ClCompile Include="CompressEncrypt.cpp" />
<ClCompile Include="dllmain.cpp" />
<ClCompile Include="KvStoreBackendS3.cpp" />
<ClCompile Include="KvStoreDao.cpp" />
<ClCompile Include="KvStoreFrontend.cpp" />
<ClCompile Include="LzmaCompressor.cpp" />
<ClCompile Include="pluginmgr.cpp" />
<ClCompile Include="TransactionalKvStore.cpp" />
</ItemGroup>
<ItemGroup>
<ClInclude Include="CdZlibCompressor.h" />
<ClInclude Include="CdZstdCompressor.h" />
<ClInclude Include="ClouddriveFactory.h" />
<ClInclude Include="CompressEncrypt.h" />
<ClInclude Include="IClouddriveFactory.h" />
<ClInclude Include="ICompressEncrypt.h" />
<ClInclude Include="IKvStoreBackend.h" />
<ClInclude Include="IOnlineKvStore.h" />
<ClInclude Include="KvStoreBackendS3.h" />
<ClInclude Include="KvStoreDao.h" />
<ClInclude Include="KvStoreFrontend.h" />
<ClInclude Include="LzmaCompressor.h" />
<ClInclude Include="pluginmgr.h" />
<ClInclude Include="TransactionalKvStore.h" />
</ItemGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
<ImportGroup Label="ExtensionTargets">
</ImportGroup>
</Project>

View File

@ -0,0 +1,117 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<ItemGroup>
<Filter Include="Quelldateien">
<UniqueIdentifier>{4FC737F1-C7A5-4376-A066-2A32D752A2FF}</UniqueIdentifier>
<Extensions>cpp;c;cc;cxx;c++;cppm;ixx;def;odl;idl;hpj;bat;asm;asmx</Extensions>
</Filter>
<Filter Include="Headerdateien">
<UniqueIdentifier>{93995380-89BD-4b04-88EB-625FBE52EBFB}</UniqueIdentifier>
<Extensions>h;hh;hpp;hxx;h++;hm;inl;inc;ipp;xsd</Extensions>
</Filter>
<Filter Include="Ressourcendateien">
<UniqueIdentifier>{67DA6AB6-F800-4c08-8B7A-83BB121AAD01}</UniqueIdentifier>
<Extensions>rc;ico;cur;bmp;dlg;rc2;rct;bin;rgs;gif;jpg;jpeg;jpe;resx;tiff;tif;png;wav;mfcribbon-ms</Extensions>
</Filter>
</ItemGroup>
<ItemGroup>
<ClCompile Include="dllmain.cpp">
<Filter>Quelldateien</Filter>
</ClCompile>
<ClCompile Include="ClouddriveFactory.cpp">
<Filter>Quelldateien</Filter>
</ClCompile>
<ClCompile Include="pluginmgr.cpp">
<Filter>Quelldateien</Filter>
</ClCompile>
<ClCompile Include="CloudFile.cpp">
<Filter>Quelldateien</Filter>
</ClCompile>
<ClCompile Include="CompressEncrypt.cpp">
<Filter>Quelldateien</Filter>
</ClCompile>
<ClCompile Include="KvStoreBackendS3.cpp">
<Filter>Quelldateien</Filter>
</ClCompile>
<ClCompile Include="KvStoreDao.cpp">
<Filter>Quelldateien</Filter>
</ClCompile>
<ClCompile Include="KvStoreFrontend.cpp">
<Filter>Quelldateien</Filter>
</ClCompile>
<ClCompile Include="LzmaCompressor.cpp">
<Filter>Quelldateien</Filter>
</ClCompile>
<ClCompile Include="TransactionalKvStore.cpp">
<Filter>Quelldateien</Filter>
</ClCompile>
<ClCompile Include="CdZlibCompressor.cpp">
<Filter>Quelldateien</Filter>
</ClCompile>
<ClCompile Include="CdZstdCompressor.cpp">
<Filter>Quelldateien</Filter>
</ClCompile>
<ClCompile Include="..\common\data.cpp">
<Filter>Quelldateien</Filter>
</ClCompile>
<ClCompile Include="..\urbackupcommon\os_functions_win.cpp">
<Filter>Quelldateien</Filter>
</ClCompile>
<ClCompile Include="..\stringtools.cpp">
<Filter>Quelldateien</Filter>
</ClCompile>
<ClCompile Include="..\urbackupcommon\json.cpp">
<Filter>Quelldateien</Filter>
</ClCompile>
<ClCompile Include="..\md5.cpp">
<Filter>Quelldateien</Filter>
</ClCompile>
<ClCompile Include="..\urbackupcommon\WalCheckpointThread.cpp">
<Filter>Quelldateien</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="IClouddriveFactory.h">
<Filter>Headerdateien</Filter>
</ClInclude>
<ClInclude Include="ClouddriveFactory.h">
<Filter>Headerdateien</Filter>
</ClInclude>
<ClInclude Include="pluginmgr.h">
<Filter>Headerdateien</Filter>
</ClInclude>
<ClInclude Include="CdZlibCompressor.h">
<Filter>Headerdateien</Filter>
</ClInclude>
<ClInclude Include="CdZstdCompressor.h">
<Filter>Headerdateien</Filter>
</ClInclude>
<ClInclude Include="CompressEncrypt.h">
<Filter>Headerdateien</Filter>
</ClInclude>
<ClInclude Include="ICompressEncrypt.h">
<Filter>Headerdateien</Filter>
</ClInclude>
<ClInclude Include="IKvStoreBackend.h">
<Filter>Headerdateien</Filter>
</ClInclude>
<ClInclude Include="IOnlineKvStore.h">
<Filter>Headerdateien</Filter>
</ClInclude>
<ClInclude Include="KvStoreBackendS3.h">
<Filter>Headerdateien</Filter>
</ClInclude>
<ClInclude Include="KvStoreDao.h">
<Filter>Headerdateien</Filter>
</ClInclude>
<ClInclude Include="KvStoreFrontend.h">
<Filter>Headerdateien</Filter>
</ClInclude>
<ClInclude Include="LzmaCompressor.h">
<Filter>Headerdateien</Filter>
</ClInclude>
<ClInclude Include="TransactionalKvStore.h">
<Filter>Headerdateien</Filter>
</ClInclude>
</ItemGroup>
</Project>

70
clouddrive/dllmain.cpp Normal file
View File

@ -0,0 +1,70 @@
/*************************************************************************
* UrBackup - Client/Server backup system
* Copyright (C) 2021 Martin Raiber
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
**************************************************************************/
#ifdef _WIN32
#define DLLEXPORT extern "C" __declspec (dllexport)
#else
#define DLLEXPORT extern "C"
#endif
#ifndef STATIC_PLUGIN
#define DEF_SERVER
#endif
#include "../Interface/Server.h"
#ifndef STATIC_PLUGIN
IServer* Server;
#else
#include "../StaticPluginRegistration.h"
extern IServer* Server;
#define LoadActions LoadActions_clouddrive
#define UnloadActions UnloadActions_clouddrive
#endif
bool is_automount_finished()
{
return false;
}
std::string getCdInterfacePath()
{
return "_UNDEF_";
}
DLLEXPORT void LoadActions(IServer* pServer)
{
Server = pServer;
}
DLLEXPORT void UnloadActions(void)
{
if (Server->getServerParameter("leak_check") == "true")
{
//destroy globals
}
}
#ifdef STATIC_PLUGIN
namespace
{
static RegisterPluginHelper register_plugin(LoadActions, UnloadActions, 10);
}
#endif

29
clouddrive/pluginmgr.cpp Normal file
View File

@ -0,0 +1,29 @@
/*************************************************************************
* UrBackup - Client/Server backup system
* Copyright (C) 2021 Martin Raiber
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
**************************************************************************/
#include "pluginmgr.h"
#include "ClouddriveFactory.h"
IPlugin* ImagePluginMgr::createPluginInstance(str_map& params)
{
return nullptr;
}
void ImagePluginMgr::destroyPluginInstance(IPlugin* plugin)
{
}

11
clouddrive/pluginmgr.h Normal file
View File

@ -0,0 +1,11 @@
#pragma once
#include "../Interface/PluginMgr.h"
class ImagePluginMgr : public IPluginMgr
{
public:
virtual IPlugin* createPluginInstance(str_map& params) override;
virtual void destroyPluginInstance(IPlugin* plugin) override;
};