Make amount of download, hash, image compress and client parallel hash threads configurable

This commit is contained in:
Martin 2020-07-21 11:14:59 +02:00
parent 8e1f0b5471
commit 746058f203
49 changed files with 950 additions and 180 deletions

View File

@ -35,6 +35,7 @@ public:
* only works with memory pipe
**/
virtual size_t getNumElements(void)=0;
virtual size_t getNumWaiters() = 0;
virtual void addThrottler(IPipeThrottler *throttler)=0;
virtual void addOutgoingThrottler(IPipeThrottler *throttler)=0;

File diff suppressed because one or more lines are too long

View File

@ -23,7 +23,7 @@
#endif
CMemoryPipe::CMemoryPipe(void)
: has_error(false)
: has_error(false), waiters(0)
{
mutex=Server->createMutex();
cond=Server->createCondition();
@ -44,7 +44,9 @@ size_t CMemoryPipe::Read(char *buffer, size_t bsize, int timeoutms)
int64 currtime=starttime;
while( queue.empty() && starttime+timeoutms>currtime && !has_error)
{
++waiters;
cond->wait( &lock, timeoutms- static_cast<int>(currtime-starttime) );
--waiters;
if(queue.empty())
{
currtime=Server->getTimeMS();
@ -63,7 +65,9 @@ size_t CMemoryPipe::Read(char *buffer, size_t bsize, int timeoutms)
{
while( queue.size()==0 && !has_error )
{
++waiters;
cond->wait(&lock);
--waiters;
}
if(has_error)
@ -117,7 +121,9 @@ size_t CMemoryPipe::Read(std::string *str, int timeoutms )
int64 currtime=starttime;
while( queue.empty() && starttime+timeoutms>currtime && !has_error)
{
++waiters;
cond->wait( &lock, timeoutms- static_cast<int>(currtime-starttime) );
--waiters;
if(queue.empty())
{
currtime=Server->getTimeMS();
@ -136,7 +142,9 @@ size_t CMemoryPipe::Read(std::string *str, int timeoutms )
{
while( queue.size()==0 && !has_error )
{
++waiters;
cond->wait(&lock);
--waiters;
}
if(has_error)
@ -181,10 +189,18 @@ bool CMemoryPipe::isReadable(int timeoutms)
if( queue.size()>0 )
return true;
if(timeoutms>0)
cond->wait( &lock, timeoutms );
else if(timeoutms<0)
if (timeoutms > 0)
{
++waiters;
cond->wait(&lock, timeoutms);
--waiters;
}
else if (timeoutms < 0)
{
++waiters;
cond->wait(&lock);
--waiters;
}
if( queue.size()>0 )
@ -203,6 +219,12 @@ size_t CMemoryPipe::getNumElements(void)
{
IScopedLock lock(mutex);
return queue.size();
}
size_t CMemoryPipe::getNumWaiters()
{
IScopedLock lock(mutex);
return waiters;
}
void CMemoryPipe::shutdown(void)

View File

@ -26,6 +26,7 @@ public:
virtual void shutdown(void);
virtual size_t getNumElements(void);
virtual size_t getNumWaiters();
virtual void addThrottler(IPipeThrottler *throttler);
virtual void addOutgoingThrottler(IPipeThrottler *throttler);
@ -43,6 +44,7 @@ private:
IMutex *mutex;
ICondition *cond;
size_t waiters;
bool has_error;
};

View File

@ -529,6 +529,10 @@ void SChannelPipe::shutdown(void)
bpipe->shutdown();
}
size_t SChannelPipe::getNumWaiters() {
return bpipe->getNumWaiters();
}
size_t SChannelPipe::getNumElements(void)
{
return bpipe->getNumElements();

View File

@ -36,6 +36,8 @@ public:
virtual void shutdown(void);
virtual size_t getNumWaiters();;
virtual size_t getNumElements(void);
virtual void addThrottler(IPipeThrottler * throttler);

View File

@ -23,6 +23,9 @@ public:
virtual void shutdown(void);
virtual size_t getNumWaiters() {
return 0;
};
virtual size_t getNumElements(void){ return 0;};
SOCKET getSocket(void);
@ -47,4 +50,6 @@ private:
std::vector<IPipeThrottler*> incoming_throttlers;
std::vector<IPipeThrottler*> outgoing_throttlers;
};

View File

@ -68,7 +68,7 @@ CompressedFile::CompressedFile( std::string pFilename, int pMode, size_t n_threa
blocksize = c_cacheBuffersize;
writeHeader();
hotCache.reset(new LRUMemCache(blocksize, c_ncacheItems, n_threads));
initCompressedBuffers(n_threads+1);
initCompressedBuffers(n_threads);
}
if(hotCache.get())
@ -93,7 +93,7 @@ CompressedFile::CompressedFile(IFile* file, bool openExisting, bool readOnly, si
blocksize = c_cacheBuffersize;
writeHeader();
hotCache.reset(new LRUMemCache(blocksize, c_ncacheItems, n_threads));
initCompressedBuffers(n_threads+1);
initCompressedBuffers(n_threads);
}
if(hotCache.get()!=NULL)
{

View File

@ -538,7 +538,7 @@ bool FSImageFactory::isNTFS(char *buffer)
}
IVHDFile *FSImageFactory::createVHDFile(const std::string &fn, bool pRead_only, uint64 pDstsize,
unsigned int pBlocksize, bool fast_mode, ImageFormat format)
unsigned int pBlocksize, bool fast_mode, ImageFormat format, size_t n_compress_threads)
{
switch(format)
{
@ -556,7 +556,7 @@ IVHDFile *FSImageFactory::createVHDFile(const std::string &fn, bool pRead_only,
}
IVHDFile *FSImageFactory::createVHDFile(const std::string &fn, const std::string &parent_fn,
bool pRead_only, bool fast_mode, ImageFormat format, uint64 pDstsize)
bool pRead_only, bool fast_mode, ImageFormat format, uint64 pDstsize, size_t n_compress_threads)
{
switch(format)
{

View File

@ -9,10 +9,12 @@ public:
bool background_priority, std::string orig_letter, IFsNextBlockCallback* next_block_callback);
virtual IVHDFile *createVHDFile(const std::string &fn, bool pRead_only, uint64 pDstsize,
unsigned int pBlocksize=2*1024*1024, bool fast_mode=false, IFSImageFactory::ImageFormat format=IFSImageFactory::ImageFormat_VHD);
unsigned int pBlocksize=2*1024*1024, bool fast_mode=false, IFSImageFactory::ImageFormat format=IFSImageFactory::ImageFormat_VHD,
size_t n_compress_threads=0);
virtual IVHDFile *createVHDFile(const std::string &fn, const std::string &parent_fn,
bool pRead_only, bool fast_mode=false, IFSImageFactory::ImageFormat format=IFSImageFactory::ImageFormat_VHD, uint64 pDstsize=0);
bool pRead_only, bool fast_mode=false, IFSImageFactory::ImageFormat format=IFSImageFactory::ImageFormat_VHD, uint64 pDstsize=0,
size_t n_compress_threads = 0);
virtual void destroyVHDFile(IVHDFile *vhd);

View File

@ -39,10 +39,12 @@ public:
};
virtual IVHDFile *createVHDFile(const std::string &fn, bool pRead_only, uint64 pDstsize,
unsigned int pBlocksize=2*1024*1024, bool fast_mode=false, ImageFormat compress=ImageFormat_VHD)=0;
unsigned int pBlocksize=2*1024*1024, bool fast_mode=false, ImageFormat compress=ImageFormat_VHD,
size_t n_compress_threads = 0)=0;
virtual IVHDFile *createVHDFile(const std::string &fn, const std::string &parent_fn,
bool pRead_only, bool fast_mode=false, ImageFormat compress=ImageFormat_VHD, uint64 pDstsize=0)=0;
bool pRead_only, bool fast_mode=false, ImageFormat compress=ImageFormat_VHD, uint64 pDstsize=0,
size_t n_compress_threads = 0)=0;
virtual void destroyVHDFile(IVHDFile *vhd)=0;

View File

@ -18,18 +18,22 @@
#include "LRUMemCache.h"
#include "../Interface/Server.h"
#include "../stringtools.h"
#include <string.h>
#include <assert.h>
LRUMemCache::LRUMemCache(size_t buffersize, size_t nbuffers, size_t n_threads)
LRUMemCache::LRUMemCache(size_t buffersize, size_t nbuffers, size_t p_n_threads)
: buffersize(buffersize), nbuffers(nbuffers), callback(NULL),
mutex(Server->createMutex()), cond(Server->createCondition()), n_threads(n_threads),
mutex(Server->createMutex()), cond(Server->createCondition()), n_threads(p_n_threads),
do_quit(false), n_threads_working(0), cond_wait(Server->createCondition()), wait_work(false)
{
if (n_threads > 0)
--n_threads;
for (size_t i = 0; i < n_threads; ++i)
{
Server->createThread(this, "comp img");
Server->createThread(this, "comp img"+convert(i));
}
}

View File

@ -48,13 +48,15 @@ namespace
size_t getNumCompThreads(bool read_only)
{
if (read_only)
return 0;
return 1;
const size_t maxCpus = 5;
#ifdef _WIN32
SYSTEM_INFO system_info;
GetSystemInfo(&system_info);
DWORD numCpus = system_info.dwNumberOfProcessors;
if (numCpus == 0)
return 1;
return (std::min)(static_cast<size_t>(numCpus), maxCpus);
#else
long numCpus = sysconf(_SC_NPROCESSORS_ONLN);
@ -62,12 +64,16 @@ namespace
{
numCpus = 2;
}
else if (numCpus == 0)
{
numCpus = 1;
}
return (std::min)(static_cast<size_t>(numCpus), maxCpus);
#endif
}
}
VHDFile::VHDFile(const std::string &fn, bool pRead_only, uint64 pDstsize, unsigned int pBlocksize, bool fast_mode, bool compress)
VHDFile::VHDFile(const std::string &fn, bool pRead_only, uint64 pDstsize, unsigned int pBlocksize, bool fast_mode, bool compress, size_t compress_n_threads)
: dstsize(pDstsize), blocksize(pBlocksize), fast_mode(fast_mode), bitmap_offset(0), bitmap_dirty(false), volume_offset(0), finished(false),
file(NULL)
{
@ -98,7 +104,7 @@ VHDFile::VHDFile(const std::string &fn, bool pRead_only, uint64 pDstsize, unsign
if(check_if_compressed() || compress)
{
compressed_file = new CompressedFile(backing_file, openedExisting, read_only, getNumCompThreads(pRead_only));
compressed_file = new CompressedFile(backing_file, openedExisting, read_only, compress_n_threads==0 ? getNumCompThreads(pRead_only): compress_n_threads);
file = compressed_file;
if(compressed_file->hasError())
@ -162,7 +168,7 @@ VHDFile::VHDFile(const std::string &fn, bool pRead_only, uint64 pDstsize, unsign
}
}
VHDFile::VHDFile(const std::string &fn, const std::string &parent_fn, bool pRead_only, bool fast_mode, bool compress, uint64 pDstsize)
VHDFile::VHDFile(const std::string &fn, const std::string &parent_fn, bool pRead_only, bool fast_mode, bool compress, uint64 pDstsize, size_t compress_n_threads)
: fast_mode(fast_mode), bitmap_offset(0), bitmap_dirty(false), volume_offset(0), finished(false), file(NULL)
{
compressed_file=NULL;
@ -191,7 +197,7 @@ VHDFile::VHDFile(const std::string &fn, const std::string &parent_fn, bool pRead
if(check_if_compressed() || compress)
{
file = new CompressedFile(backing_file, openedExisting, read_only, getNumCompThreads(pRead_only));
file = new CompressedFile(backing_file, openedExisting, read_only, compress_n_threads==0 ? getNumCompThreads(pRead_only) : compress_n_threads);
}
else
{

View File

@ -64,8 +64,8 @@ class CompressedFile;
class VHDFile : public IVHDFile, public IFile
{
public:
VHDFile(const std::string &fn, bool pRead_only, uint64 pDstsize, unsigned int pBlocksize=2*1024*1024, bool fast_mode=false, bool compress=false);
VHDFile(const std::string &fn, const std::string &parent_fn, bool pRead_only, bool fast_mode=false, bool compress=false, uint64 pDstsize=0);
VHDFile(const std::string &fn, bool pRead_only, uint64 pDstsize, unsigned int pBlocksize=2*1024*1024, bool fast_mode=false, bool compress=false, size_t compress_n_threads=0);
VHDFile(const std::string &fn, const std::string &parent_fn, bool pRead_only, bool fast_mode=false, bool compress=false, uint64 pDstsize=0, size_t compress_n_threads = 0);
~VHDFile();
virtual std::string Read(_u32 tr, bool *has_error=NULL);

View File

@ -1,10 +1,12 @@
#include "ParallelHash.h"
#include "../Interface/Server.h"
#include "../Interface/ThreadPool.h"
#include "../Interface/Condition.h"
#include "ClientHash.h"
#include <algorithm>
#include "database.h"
#include "../stringtools.h"
#include <assert.h>
//#define HASH_CBT_CHECK
@ -15,13 +17,16 @@ namespace
const int64 link_file_min_size = 2048;
}
ParallelHash::ParallelHash(SQueueRef* phash_queue, int sha_version)
ParallelHash::ParallelHash(SQueueRef* phash_queue, int sha_version, size_t extra_n_threads)
: do_quit(false), phash_queue(phash_queue), phash_queue_pos(0),
stdout_buf_size(0), stdout_buf_pos(0), mutex(Server->createMutex()),
last_file_buffer_commit_time(0), sha_version(sha_version), eof(false)
last_file_buffer_commit_time(0), sha_version(sha_version), eof(false),
extra_n_threads(extra_n_threads), extra_thread(false),
extra_mutex(Server->createMutex()), extra_cond(Server->createCondition()),
modify_file_buffer_mutex(Server->createMutex())
{
stdout_buf.resize(4090);
ticket = Server->getThreadPool()->execute(this, "phash");
ticket = Server->getThreadPool()->execute(this, extra_n_threads>0 ? "phash master": "phash");
}
bool ParallelHash::getExitCode(int & exit_code)
@ -94,6 +99,18 @@ bool ParallelHash::readStderrIntoBuffer(char * buf, size_t buf_avail, size_t & r
void ParallelHash::operator()()
{
if (extra_thread)
{
runExtraThread();
return;
}
extra_thread = true;
for (size_t i = 0; i < extra_n_threads; ++i)
{
extra_tickets.push_back(Server->getThreadPool()->execute(this, "phash e"+convert(i)));
}
ClientDAO clientdao(Server->getDatabase(Server->getThreadID(), URBACKUPDB_CLIENT));
int mode = MODE_READ_SEQUENTIAL;
@ -117,10 +134,29 @@ void ParallelHash::operator()()
had_msg = true;
std::string msg = phashf->Read(phash_queue_pos + sizeof(_u32), msg_size);
CRData data(msg.data(), msg.size());
int64 working_file_id = phash_queue_pos;
phash_queue_pos += sizeof(_u32) + msg_size;
hashFile(data, clientdao);
IScopedLock lock(extra_mutex.get());
working_file_ids.insert(working_file_id);
if (extra_n_threads > 0)
{
extra_queue.push_back(std::make_pair(working_file_id, msg));
extra_cond->notify_one();
}
else
{
lock.relock(NULL);
CRData data(msg.data(), msg.size());
hashFile(working_file_id, data, clientdao);
lock.relock(extra_mutex.get());
working_file_ids.erase(working_file_id);
addQueuedStdoutMsgs();
}
if (eof)
{
@ -142,6 +178,14 @@ void ParallelHash::operator()()
commitModifyFileBuffer(clientdao);
{
IScopedLock lock(extra_mutex.get());
do_quit = true;
extra_cond->notify_all();
}
Server->getThreadPool()->waitFor(extra_tickets);
if (phash_queue->deref())
{
delete phash_queue;
@ -149,7 +193,18 @@ void ParallelHash::operator()()
}
}
bool ParallelHash::hashFile(CRData & data, ClientDAO& clientdao)
void ParallelHash::addQueuedStdoutMsgs()
{
std::map<int64, CWData>::iterator it_msg;
while (!working_file_ids.empty() &&
(it_msg = stdout_msg_buf.find(*working_file_ids.begin())) != stdout_msg_buf.end())
{
addToStdoutBuf(it_msg->second.getDataPtr(), it_msg->second.getDataSize());
stdout_msg_buf.erase(it_msg);
}
}
bool ParallelHash::hashFile(int64 working_file_id, CRData & data, ClientDAO& clientdao)
{
char id;
if (!data.getChar(&id))
@ -157,71 +212,50 @@ bool ParallelHash::hashFile(CRData & data, ClientDAO& clientdao)
if (id == ID_SET_CURR_DIRS)
{
curr_files.clear();
if (!data.getStr2(&curr_dir)
|| !data.getInt(&curr_tgroup)
|| !data.getStr2(&curr_snapshot_dir))
int64 dir_id;
SCurrDir dir;
if (!data.getVarInt(&dir_id)
|| !data.getStr2(&dir.dir)
|| !data.getInt(&dir.tgroup)
|| !data.getStr2(&dir.snapshot_dir))
{
assert(false);
return false;
}
dir.finish = false;
IScopedLock lock(mutex.get());
curr_dirs[dir_id] = dir;
return true;
}
else if (id == ID_FINISH_CURR_DIR)
{
int64 target_generation;
if (!data.getVarInt(&target_generation))
int64 id;
int64 dir_files;
if (!data.getVarInt(&id)
|| !data.getVarInt(&target_generation)
|| !data.getVarInt(&dir_files))
{
curr_files.clear();
assert(false);
return false;
}
#ifndef _WIN32
std::string path_lower = curr_dir + os_file_sep();
#else
std::string path_lower = strlower(curr_dir + os_file_sep());
#endif
std::vector<SFileAndHash> files;
int64 generation = -1;
if (clientdao.getFiles(path_lower, curr_tgroup, files, generation))
SCurrDir* dir;
{
if (generation != target_generation)
IScopedLock lock(mutex.get());
dir = &curr_dirs[id];
if (dir->files.size() != dir_files)
{
curr_files.clear();
dir->finish = true;
dir->target_generation = target_generation;
dir->dir_target_nfiles = dir_files;
return true;
}
std::sort(curr_files.begin(), curr_files.end());
bool added_hash = false;
for (size_t i = 0; i < files.size(); ++i)
{
if (files[i].hash.empty())
{
std::vector<SFileAndHash>::iterator it =
std::lower_bound(curr_files.begin(), curr_files.end(), files[i]);
if (it != curr_files.end()
&& it->name == files[i].name)
{
files[i].hash = it->hash;
added_hash = true;
}
}
}
if (added_hash)
{
addModifyFileBuffer(clientdao, path_lower, curr_tgroup, files, target_generation);
}
curr_files.clear();
return true;
}
curr_files.clear();
return false;
return finishDir(dir, clientdao, target_generation, id);
}
else if (id == ID_INIT_HASH)
{
@ -254,22 +288,38 @@ bool ParallelHash::hashFile(CRData & data, ClientDAO& clientdao)
if (id != ID_HASH_FILE)
{
assert(false);
return false;
}
int64 file_id;
if (!data.getVarInt(&file_id))
{
assert(false);
return false;
}
std::string fn;
if (!data.getStr2(&fn))
{
assert(false);
return false;
}
std::string full_path = curr_snapshot_dir + os_file_sep() + fn;
int64 dir_id;
if (!data.getVarInt(&dir_id))
{
assert(false);
return false;
}
SCurrDir* dir;
{
IScopedLock lock(mutex.get());
dir = &curr_dirs[dir_id];
}
std::string full_path = dir->snapshot_dir + os_file_sep() + fn;
std::auto_ptr<IFsFile> f(Server->openFile(os_file_prefix(full_path), MODE_READ_SEQUENTIAL_BACKUP));
@ -345,13 +395,91 @@ bool ParallelHash::hashFile(CRData & data, ClientDAO& clientdao)
wdata.addString2(fandhash.hash);
fandhash.name = fn;
*reinterpret_cast<_u16*>(wdata.getDataPtr()) = little_endian(static_cast<_u16>(wdata.getDataSize() - sizeof(_u16)));
curr_files.push_back(fandhash);
bool finish_dir = false;
{
IScopedLock lock(mutex.get());
dir->files.push_back(fandhash);
if (dir->finish &&
dir->files.size() == dir->dir_target_nfiles)
{
finish_dir = true;
}
}
if (finish_dir)
{
finishDir(dir, clientdao, dir->target_generation, dir_id);
}
Server->Log("Parallel hash \"" + full_path + "\" id=" + convert(file_id) + " hash=" + base64_encode_dash(fandhash.hash)+ ph_action, LL_DEBUG);
{
IScopedLock lock(mutex.get());
if (!working_file_ids.empty()
&& *working_file_ids.begin() != working_file_id)
{
stdout_msg_buf[working_file_id] = wdata;
return true;
}
}
return addToStdoutBuf(wdata.getDataPtr(), wdata.getDataSize());
}
bool ParallelHash::finishDir(ParallelHash::SCurrDir* dir, ClientDAO& clientdao, const int64& target_generation, int64& id)
{
#ifndef _WIN32
std::string path_lower = dir->dir + os_file_sep();
#else
std::string path_lower = strlower(dir->dir + os_file_sep());
#endif
std::vector<SFileAndHash> files;
int64 generation = -1;
if (clientdao.getFiles(path_lower, dir->tgroup, files, generation))
{
if (generation != target_generation)
{
IScopedLock lock(mutex.get());
curr_dirs.erase(id);
return true;
}
std::sort(dir->files.begin(), dir->files.end());
bool added_hash = false;
for (size_t i = 0; i < files.size(); ++i)
{
if (files[i].hash.empty())
{
std::vector<SFileAndHash>::iterator it =
std::lower_bound(dir->files.begin(), dir->files.end(), files[i]);
if (it != dir->files.end()
&& it->name == files[i].name)
{
files[i].hash = it->hash;
added_hash = true;
}
}
}
if (added_hash)
{
addModifyFileBuffer(clientdao, path_lower, dir->tgroup, files, target_generation);
}
IScopedLock lock(mutex.get());
curr_dirs.erase(id);
return true;
}
IScopedLock lock(mutex.get());
curr_dirs.erase(id);
return false;
}
bool ParallelHash::addToStdoutBuf(const char * ptr, size_t size)
{
IScopedLock lock(mutex.get());
@ -392,9 +520,35 @@ size_t ParallelHash::calcBufferSize(const std::string &path, const std::vector<S
return add_size;
}
void ParallelHash::runExtraThread()
{
IScopedLock lock(extra_mutex.get());
while (!do_quit)
{
while (extra_queue.empty() &&
do_quit)
{
extra_cond->wait(&lock);
}
std::pair<int64, std::string> msg = extra_queue.front();
extra_queue.pop_front();
lock.relock(NULL);
lock.relock(extra_mutex.get());
working_file_ids.erase(msg.first);
addQueuedStdoutMsgs();
}
}
void ParallelHash::addModifyFileBuffer(ClientDAO& clientdao, const std::string & path, int tgroup,
const std::vector<SFileAndHash>& files, int64 target_generation)
{
IScopedLock lock(modify_file_buffer_mutex.get());
modify_file_buffer_size += calcBufferSize(path, files);
modify_file_buffer.push_back(SBufferItem(path, tgroup, files, target_generation));

View File

@ -8,6 +8,8 @@
#include "clientdao.h"
#include "client.h"
#include <memory>
#include <deque>
#include <set>
namespace
{
@ -24,7 +26,7 @@ class ClientHash;
class ParallelHash : public IPipeFileExt, public IThread
{
public:
ParallelHash(SQueueRef* phash_queue, int sha_version);
ParallelHash(SQueueRef* phash_queue, int sha_version, size_t extra_n_threads);
virtual bool getExitCode(int & exit_code);
virtual void forceExit();
@ -34,12 +36,30 @@ public:
void operator()();
void addQueuedStdoutMsgs();
private:
bool hashFile(CRData& data, ClientDAO& clientdao);
struct SCurrDir
{
int tgroup;
std::string dir;
std::string snapshot_dir;
std::vector<SFileAndHash> files;
int64 target_generation;
int64 dir_target_nfiles;
bool finish;
};
bool hashFile(int64 working_file_id, CRData& data, ClientDAO& clientdao);
bool finishDir(ParallelHash::SCurrDir* dir, ClientDAO& clientdao, const int64& target_generation, int64& id);
bool addToStdoutBuf(const char* ptr, size_t size);
void addModifyFileBuffer(ClientDAO& clientdao, const std::string& path, int tgroup, const std::vector<SFileAndHash>& files, int64 target_generation);
void commitModifyFileBuffer(ClientDAO& clientdao);
size_t calcBufferSize(const std::string &path, const std::vector<SFileAndHash> &data);
void runExtraThread();
std::map<int64, CWData> stdout_msg_buf;
std::set<int64> working_file_ids;
std::vector<char> stdout_buf;
size_t stdout_buf_pos;
@ -48,14 +68,16 @@ private:
volatile bool eof;
int64 phash_queue_pos;
SQueueRef* phash_queue;
std::auto_ptr<IMutex> mutex;
std::string curr_dir;
int curr_tgroup;
std::string curr_snapshot_dir;
std::vector<SFileAndHash> curr_files;
std::auto_ptr<IMutex> mutex;
std::map<int64, SCurrDir> curr_dirs;
std::deque<std::string> postponed_finish;
std::auto_ptr<ClientHash> client_hash;
int sha_version;
THREADPOOL_TICKET ticket;
std::vector<THREADPOOL_TICKET> extra_tickets;
struct SBufferItem
{
@ -69,7 +91,14 @@ private:
int64 target_generation;
};
std::auto_ptr<IMutex> modify_file_buffer_mutex;
std::vector< SBufferItem > modify_file_buffer;
size_t modify_file_buffer_size;
int64 last_file_buffer_commit_time;
size_t extra_n_threads;
bool extra_thread;
std::deque<std::pair<int64, std::string> > extra_queue;
std::auto_ptr<IMutex> extra_mutex;
std::auto_ptr<ICondition> extra_cond;
};

View File

@ -2489,6 +2489,8 @@ bool IndexThread::initialCheck(std::vector<SRecurParams>& params_stack, size_t s
}
bool finish_phash_path = false;
int64 phash_dir_id = file_id;
int64 phash_dir_files = 0;
bool has_include = false;
for(size_t i=0;i<files.size();++i)
@ -2544,7 +2546,8 @@ bool IndexThread::initialCheck(std::vector<SRecurParams>& params_stack, size_t s
}
else if (calculate_filehashes_on_client
&& phash_queue != NULL
&& !files[i].isspecialf)
&& !files[i].isspecialf
&& files[i].size>=link_file_min_size)
{
if (!finish_phash_path)
{
@ -2552,16 +2555,19 @@ bool IndexThread::initialCheck(std::vector<SRecurParams>& params_stack, size_t s
CWData wdata;
wdata.addChar(ID_SET_CURR_DIRS);
wdata.addVarInt(phash_dir_id);
wdata.addString2(orig_dir);
wdata.addInt(index_group);
wdata.addString2(dir);
addToPhashQueue(wdata);
}
++phash_dir_files;
CWData wdata;
wdata.addChar(ID_HASH_FILE);
wdata.addVarInt(file_id);
wdata.addString2(files[i].name);
wdata.addVarInt(phash_dir_id);
addToPhashQueue(wdata);
}
@ -2599,7 +2605,9 @@ bool IndexThread::initialCheck(std::vector<SRecurParams>& params_stack, size_t s
{
CWData wdata;
wdata.addChar(ID_FINISH_CURR_DIR);
wdata.addVarInt(phash_dir_id);
wdata.addVarInt(target_generation);
wdata.addVarInt(phash_dir_files);
addToPhashQueue(wdata);
}
@ -9034,13 +9042,28 @@ void IndexThread::initParallelHashing(const std::string & async_ticket)
{
delete phash_queue;
}
std::string settings_fn = "urbackup/data/settings.cfg";
if (!index_clientsubname.empty())
{
settings_fn = "urbackup/data/settings_" + conv_filename(index_clientsubname) + ".cfg";
}
std::auto_ptr<ISettingsReader> curr_settings(Server->createFileSettingsReader(settings_fn));
size_t client_hash_threads = 0;
if (curr_settings.get() != NULL)
{
client_hash_threads = curr_settings->getValue("client_hash_threads", 0);
}
std::string fn = "phash_" + bytesToHex(async_ticket);
phash_queue = new SQueueRef(Server->openTemporaryFile(), this, fn);
phash_queue->ref();
phash_queue_write_pos = 0;
os_create_dir(Server->getServerWorkingDir() + "urbackup" + os_file_sep() + "phash");
filesrv->shareDir("phash_{9c28ff72-5a74-487b-b5e1-8f1c96cd0cf4}", Server->getServerWorkingDir() + "/urbackup/phash", std::string(), true);
ParallelHash* phash = new ParallelHash(phash_queue->ref(), sha_version);
ParallelHash* phash = new ParallelHash(phash_queue->ref(), sha_version, client_hash_threads);
filesrv->registerScriptPipeFile(fn, phash);
}

View File

@ -315,6 +315,11 @@ void CompressedPipe::shutdown(void)
cs->shutdown();
}
size_t CompressedPipe::getNumWaiters()
{
return cs->getNumWaiters();
}
size_t CompressedPipe::getNumElements(void)
{
return cs->getNumElements();

View File

@ -36,6 +36,8 @@ public:
virtual void shutdown(void);
virtual size_t getNumWaiters();;
virtual size_t getNumElements(void);
void destroyBackendPipeOnDelete(bool b);

View File

@ -448,6 +448,10 @@ void CompressedPipe2::shutdown(void)
cs->shutdown();
}
size_t CompressedPipe2::getNumWaiters() {
return cs->getNumWaiters();
}
size_t CompressedPipe2::getNumElements(void)
{
return cs->getNumElements();

View File

@ -38,6 +38,8 @@ public:
virtual void shutdown(void);
virtual size_t getNumWaiters();;
virtual size_t getNumElements(void);
virtual void destroyBackendPipeOnDelete(bool b);

View File

@ -474,6 +474,10 @@ void CompressedPipeZstd::shutdown(void)
cs->shutdown();
}
size_t CompressedPipeZstd::getNumWaiters() {
return cs->getNumWaiters();
}
size_t CompressedPipeZstd::getNumElements(void)
{
return cs->getNumElements();

View File

@ -33,6 +33,8 @@ public:
virtual void shutdown(void);
virtual size_t getNumWaiters();;
virtual size_t getNumElements(void);
virtual void destroyBackendPipeOnDelete(bool b);

View File

@ -146,6 +146,11 @@ void InternetServicePipe::shutdown(void)
cs->shutdown();
}
size_t InternetServicePipe::getNumWaiters()
{
return cs->getNumWaiters();
}
size_t InternetServicePipe::getNumElements(void)
{
return cs->getNumElements();

View File

@ -49,6 +49,8 @@ public:
virtual void shutdown(void);
virtual size_t getNumWaiters();;
virtual size_t getNumElements(void);
void destroyBackendPipeOnDelete(bool b);

View File

@ -242,6 +242,10 @@ void InternetServicePipe2::shutdown( void )
cs->shutdown();
}
size_t InternetServicePipe2::getNumWaiters() {
return cs->getNumWaiters();
}
size_t InternetServicePipe2::getNumElements( void )
{
return cs->getNumElements();

View File

@ -49,6 +49,8 @@ public:
virtual void shutdown( void );
virtual size_t getNumWaiters();;
virtual size_t getNumElements( void );
virtual void addThrottler( IPipeThrottler *throttler );

View File

@ -46,6 +46,10 @@ public:
{
pipe->shutdown();
}
virtual size_t getNumWaiters()
{
return pipe->getNumWaiters();
};
virtual size_t getNumElements(void)
{
return pipe->getNumElements();

View File

@ -107,6 +107,10 @@ std::vector<std::string> getSettingsList(void)
ret.push_back("client_settings_tray_access_pw");
ret.push_back("local_encrypt");
ret.push_back("local_compress");
ret.push_back("download_threads");
ret.push_back("hash_threads");
ret.push_back("client_hash_threads");
ret.push_back("image_compress_threads");
return ret;
}

View File

@ -581,36 +581,43 @@ std::string FileBackup::clientlistName(int ref_backupid)
void FileBackup::createHashThreads(bool use_reflink, bool ignore_hash_mismatches)
{
assert(bsh==NULL);
assert(bsh_prepare==NULL);
assert(bsh.empty());
assert(bsh_prepare.empty());
hashpipe=Server->createMemoryPipe();
hashpipe_prepare=Server->createMemoryPipe();
bsh=new BackupServerHash(hashpipe, clientid, use_snapshots, use_reflink, use_tmpfiles, logid, use_snapshots, max_file_id);
bsh_prepare=new BackupServerPrepareHash(hashpipe_prepare, hashpipe, clientid, logid, ignore_hash_mismatches);
bsh_ticket = Server->getThreadPool()->execute(bsh, "fbackup write");
bsh_prepare_ticket = Server->getThreadPool()->execute(bsh_prepare, "fbackup hash");
size_t h_cnt = server_settings->getSettings()->hash_threads;
for (size_t i = 0; i < h_cnt; ++i)
{
BackupServerHash* curr_bsh = new BackupServerHash(hashpipe, clientid, use_snapshots, use_reflink, use_tmpfiles, logid, use_snapshots, max_file_id);
BackupServerPrepareHash* curr_bsh_prepare = new BackupServerPrepareHash(hashpipe_prepare, hashpipe, clientid, logid, ignore_hash_mismatches);
bsh.push_back(curr_bsh);
bsh_prepare.push_back(curr_bsh_prepare);
bsh_ticket.push_back(Server->getThreadPool()->execute(curr_bsh, "fbackup write" + convert(i)));
bsh_prepare_ticket.push_back(Server->getThreadPool()->execute(curr_bsh_prepare, "fbackup hash" + convert(i)));
}
}
void FileBackup::destroyHashThreads()
{
if (hashpipe_prepare != NULL)
if (!bsh_prepare.empty())
{
assert(bsh_ticket != ILLEGAL_THREADPOOL_TICKET);
assert(bsh_prepare_ticket != ILLEGAL_THREADPOOL_TICKET);
hashpipe_prepare->Write("exit");
Server->getThreadPool()->waitFor(bsh_ticket);
Server->getThreadPool()->waitFor(bsh_prepare_ticket);
Server->destroy(hashpipe_prepare);
Server->destroy(hashpipe);
}
bsh_ticket=ILLEGAL_THREADPOOL_TICKET;
bsh_prepare_ticket=ILLEGAL_THREADPOOL_TICKET;
bsh_ticket.clear();
bsh_prepare_ticket.clear();
hashpipe=NULL;
hashpipe_prepare=NULL;
bsh=NULL;
bsh_prepare=NULL;
bsh.clear();
bsh_prepare.clear();
}
_i64 FileBackup::getIncrementalSize(IFile *f, const std::vector<size_t> &diffs, bool& backup_with_components, bool all)
@ -1219,18 +1226,30 @@ void FileBackup::waitForFileThreads(void)
SStatus status=ServerStatus::getStatus(clientname);
hashpipe->Write("flush");
hashpipe_prepare->Write("flush");
_u32 hashqueuesize=(_u32)hashpipe->getNumElements()+(bsh->isWorking()?1:0);
_u32 prepare_hashqueuesize=(_u32)hashpipe_prepare->getNumElements()+(bsh_prepare->isWorking()?1:0);
while(hashqueuesize>0 || prepare_hashqueuesize>0)
size_t hashqueuesize=std::string::npos;
size_t prepare_hashqueuesize=0;
while(hashqueuesize==std::string::npos || hashqueuesize>0 || prepare_hashqueuesize>0)
{
ServerStatus::setProcessQueuesize(clientname, status_id, prepare_hashqueuesize, hashqueuesize);
Server->wait(1000);
hashqueuesize=(_u32)hashpipe->getNumElements()+(bsh->isWorking()?1:0);
prepare_hashqueuesize=(_u32)hashpipe_prepare->getNumElements()+(bsh_prepare->isWorking()?1:0);
if (hashqueuesize != std::string::npos)
{
ServerStatus::setProcessQueuesize(clientname, status_id, prepare_hashqueuesize, hashqueuesize);
Server->wait(1000);
}
size_t bsh_working = bsh.size() - hashpipe->getNumWaiters();
size_t bsh_prepare_working = bsh_prepare.size() - hashpipe_prepare->getNumWaiters();
hashqueuesize = hashpipe->getNumElements() + bsh_working;
prepare_hashqueuesize = hashpipe_prepare->getNumElements() + bsh_prepare_working;
}
{
Server->wait(10);
while(bsh->isWorking()) Server->wait(1000);
while (hashpipe->getNumWaiters() < bsh.size())
{
Server->wait(1000);
}
}
ServerStatus::setProcessQueuesize(clientname, status_id, 0, 0);

View File

@ -266,10 +266,10 @@ protected:
IPipe *hashpipe;
IPipe *hashpipe_prepare;
BackupServerHash *bsh;
THREADPOOL_TICKET bsh_ticket;
BackupServerPrepareHash *bsh_prepare;
THREADPOOL_TICKET bsh_prepare_ticket;
std::vector<BackupServerHash*> bsh;
std::vector<THREADPOOL_TICKET> bsh_ticket;
std::vector<BackupServerPrepareHash*> bsh_prepare;
std::vector<THREADPOOL_TICKET> bsh_prepare_ticket;
std::auto_ptr<BackupServerHash> local_hash;
std::auto_ptr<BackupServerHash> local_hash2;

View File

@ -26,7 +26,7 @@
#include "../urbackupcommon/fileclient/FileClient.h"
#include "../urbackupcommon/filelist_utils.h"
#include "server_running.h"
#include "ServerDownloadThread.h"
#include "ServerDownloadThreadGroup.h"
#include "../urbackupcommon/file_metadata.h"
#include "FileMetadataDownloadThread.h"
#include "snapshot_helper.h"
@ -223,19 +223,17 @@ bool FullFileBackup::doFileBackup()
std::string last_backuppath;
std::string last_backuppath_complete;
std::auto_ptr<ServerDownloadThread> server_download(new ServerDownloadThread(fc, NULL, backuppath,
std::auto_ptr<ServerDownloadThreadGroup> server_download(new ServerDownloadThreadGroup(fc, NULL, backuppath,
backuppath_hashes, last_backuppath, last_backuppath_complete,
hashed_transfer, save_incomplete_files, clientid, clientname, clientsubname,
use_tmpfiles, tmpfile_path, server_token, use_reflink,
backupid, false, hashpipe_prepare, client_main, client_main->getProtocolVersions().filesrv_protocol_version,
0, logid, with_hashes, shares_without_snapshot, with_sparse_hashing, metadata_download_thread.get(),
backup_with_components, filepath_corrections, max_file_id));
backup_with_components, server_settings->getSettings()->download_threads, server_settings.get(),
false, filepath_corrections, max_file_id));
bool queue_downloads = client_main->getProtocolVersions().filesrv_protocol_version>2;
THREADPOOL_TICKET server_download_ticket =
Server->getThreadPool()->execute(server_download.get(), "fbackup load");
ServerStatus::setProcessTotalBytes(clientname, status_id, files_size);
fc.resetReceivedDataBytes(true);
@ -618,7 +616,7 @@ bool FullFileBackup::doFileBackup()
ServerLogger::Log(logid, "Waiting for file transfers...", LL_INFO);
while(!Server->getThreadPool()->waitFor(server_download_ticket, 1000))
while(!server_download->join(1000))
{
if(files_size==0)
{
@ -716,6 +714,7 @@ bool FullFileBackup::doFileBackup()
std::stack<size_t> last_modified_offsets;
script_dir=false;
has_read_error = false;
size_t download_max_ok_id = server_download->getMaxOkId();
while( (read=tmp_filelist->Read(buffer, 4096, &has_read_error))>0 )
{
if (has_read_error)
@ -802,7 +801,7 @@ bool FullFileBackup::doFileBackup()
}
}
else if(!cf.isdir &&
line <= (std::max)(server_download->getMaxOkId(), max_ok_id) &&
line <= (std::max)(download_max_ok_id, max_ok_id) &&
server_download->isDownloadOk(line) )
{
bool metadata_missing = (!script_dir && metadata_download_thread.get()!=NULL
@ -865,11 +864,18 @@ bool FullFileBackup::doFileBackup()
}
}
if( bsh->hasError() || bsh_prepare->hasError() )
for (size_t i = 0; i < bsh.size(); ++i)
{
disk_error=true;
if (bsh[i]->hasError())
disk_error = true;
}
else if(verification_ok)
for (size_t i = 0; i < bsh_prepare.size(); ++i)
{
if (bsh_prepare[i]->hasError())
disk_error = true;
}
if(!disk_error && verification_ok)
{
FileIndex::flush();

View File

@ -1062,12 +1062,13 @@ bool ImageBackup::doImage(const std::string &pLetter, const std::string &pParent
{
r_vhdfile=image_fak->createVHDFile(os_file_prefix(imagefn), false, drivesize+mbr_size,
(unsigned int)vhd_blocksize*blocksize, true,
image_format);
image_format, server_settings->getSettings()->image_compress_threads);
}
else
{
r_vhdfile=image_fak->createVHDFile(os_file_prefix(imagefn), pParentvhd, false,
true, image_format, drivesize + mbr_size);
true, image_format, drivesize + mbr_size,
server_settings->getSettings()->image_compress_threads);
}
if(r_vhdfile==NULL || !r_vhdfile->isOpen())

View File

@ -25,7 +25,7 @@
#include "server_dir_links.h"
#include "server_running.h"
#include "server_cleanup.h"
#include "ServerDownloadThread.h"
#include "ServerDownloadThreadGroup.h"
#include "FileIndex.h"
#include <stack>
#include "../urbackupcommon/file_metadata.h"
@ -523,19 +523,17 @@ bool IncrFileBackup::doFileBackup()
bool backup_with_components;
_i64 files_size = getIncrementalSize(tmp_filelist, diffs, backup_with_components);
std::auto_ptr<ServerDownloadThread> server_download(new ServerDownloadThread(fc, fc_chunked.get(), backuppath,
std::auto_ptr<ServerDownloadThreadGroup> server_download(new ServerDownloadThreadGroup(fc, fc_chunked.get(), backuppath,
backuppath_hashes, last_backuppath, last_backuppath_complete,
hashed_transfer, intra_file_diffs, clientid, clientname, clientsubname,
use_tmpfiles, tmpfile_path, server_token, use_reflink,
backupid, r_incremental, hashpipe_prepare, client_main, client_main->getProtocolVersions().filesrv_protocol_version,
incremental_num, logid, with_hashes, shares_without_snapshot, with_sparse_hashing, metadata_download_thread.get(),
backup_with_components, filepath_corrections, max_file_id));
backup_with_components, server_settings->getSettings()->download_threads, server_settings.get(),
intra_file_diffs, filepath_corrections, max_file_id));
bool queue_downloads = client_main->getProtocolVersions().filesrv_protocol_version>2;
THREADPOOL_TICKET server_download_ticket =
Server->getThreadPool()->execute(server_download.get(), "fbackup load");
char buffer[4096];
_u32 read;
std::string curr_path;
@ -1370,7 +1368,7 @@ bool IncrFileBackup::doFileBackup()
ServerLogger::Log(logid, "Waiting for file transfers...", LL_INFO);
while(!Server->getThreadPool()->waitFor(server_download_ticket, 1000))
while(!server_download->join(1000))
{
if(files_size==0)
{
@ -1431,9 +1429,15 @@ bool IncrFileBackup::doFileBackup()
waitForFileThreads();
if( bsh->hasError() || bsh_prepare->hasError() )
for (size_t i = 0; i < bsh.size(); ++i)
{
disk_error=true;
if (bsh[i]->hasError())
disk_error = true;
}
for (size_t i = 0; i < bsh_prepare.size(); ++i)
{
if (bsh_prepare[i]->hasError())
disk_error = true;
}
if (!r_offline && !c_has_error && !disk_error)

View File

@ -45,7 +45,8 @@ namespace
ServerDownloadThread::ServerDownloadThread( FileClient& fc, FileClientChunked* fc_chunked, const std::string& backuppath, const std::string& backuppath_hashes, const std::string& last_backuppath, const std::string& last_backuppath_complete, bool hashed_transfer, bool save_incomplete_file, int clientid,
const std::string& clientname, const std::string& clientsubname, bool use_tmpfiles, const std::string& tmpfile_path, const std::string& server_token, bool use_reflink, int backupid, bool r_incremental, IPipe* hashpipe_prepare, ClientMain* client_main,
int filesrv_protocol_version, int incremental_num, logid_t logid, bool with_hashes, const std::vector<std::string>& shares_without_snapshot, bool with_sparse_hashing, server::FileMetadataDownloadThread* file_metadata_download, bool sc_failure_fatal,
FilePathCorrections& filepath_corrections, MaxFileId& max_file_id)
size_t thread_idx,
FilePathCorrections& filepath_corrections, MaxFileId& max_file_id, ActiveDlIds& active_dls_ids)
: fc(fc), fc_chunked(fc_chunked), backuppath(backuppath), backuppath_hashes(backuppath_hashes),
last_backuppath(last_backuppath), last_backuppath_complete(last_backuppath_complete), hashed_transfer(hashed_transfer), save_incomplete_file(save_incomplete_file), clientid(clientid),
clientname(clientname), clientsubname(clientsubname),
@ -53,7 +54,7 @@ ServerDownloadThread::ServerDownloadThread( FileClient& fc, FileClientChunked* f
is_offline(false), client_main(client_main), filesrv_protocol_version(filesrv_protocol_version), skipping(false), queue_size(0),
all_downloads_ok(true), incremental_num(incremental_num), logid(logid), has_timeout(false), with_hashes(with_hashes), with_metadata(client_main->getProtocolVersions().file_meta>0), shares_without_snapshot(shares_without_snapshot),
with_sparse_hashing(with_sparse_hashing), exp_backoff(false), num_embedded_metadata_files(0), file_metadata_download(file_metadata_download), num_issues(0), last_snap_num_issues(0), has_disk_error(false), sc_failure_fatal(sc_failure_fatal),
tmpfile_num(0), filepath_corrections(filepath_corrections), max_file_id(max_file_id)
tmpfile_num(0), filepath_corrections(filepath_corrections), max_file_id(max_file_id), thread_idx(thread_idx), active_dls_ids(active_dls_ids)
{
mutex = Server->createMutex();
cond = Server->createCondition();
@ -202,6 +203,8 @@ void ServerDownloadThread::operator()( void )
}
else if(curr.action==EQueueAction_StopShadowcopy)
{
active_dls_ids.waitBefore(curr.id);
if (!stop_shadowcopy(curr.fn))
{
IScopedLock lock(mutex);
@ -235,7 +238,10 @@ void ServerDownloadThread::operator()( void )
}
}
if(!is_offline && !skipping && client_main->getProtocolVersions().file_meta>0)
active_dls_ids.waitAll();
if(!is_offline && !skipping && client_main->getProtocolVersions().file_meta>0 &&
thread_idx==0)
{
_u32 rc = fc.InformMetadataStreamEnd(server_token, 3);
@ -1515,18 +1521,22 @@ bool ServerDownloadThread::stop_shadowcopy(std::string path)
return fret;
}
bool ServerDownloadThread::sleepQueue()
bool ServerDownloadThread::queueFull()
{
IScopedLock lock(mutex);
if(queue_size>max_queue_size)
{
lock.relock(NULL);
Server->wait(1000);
return true;
}
return false;
}
size_t ServerDownloadThread::queueSize()
{
IScopedLock lock(mutex);
return queue_size;
}
size_t ServerDownloadThread::getNumEmbeddedMetadataFiles()
{
return num_embedded_metadata_files;

View File

@ -137,17 +137,62 @@ namespace
};
}
class ServerDownloadThread : public IThread, public FileClient::QueueCallback, public FileClientChunked::QueueCallback
{
public:
ServerDownloadThread(FileClient& fc, FileClientChunked* fc_chunked, const std::string& backuppath, const std::string& backuppath_hashes, const std::string& last_backuppath, const std::string& last_backuppath_complete, bool hashed_transfer, bool save_incomplete_file, int clientid,
class ActiveDlIds
{
std::auto_ptr<IMutex> mutex;
std::set<size_t> ids;
public:
ActiveDlIds()
: mutex(Server->createMutex())
{}
void addActiveId(size_t id)
{
IScopedLock lock(mutex.get());
ids.insert(id);
}
void removeActiveId(size_t id)
{
IScopedLock lock(mutex.get());
ids.erase(id);
}
void waitAll()
{
IScopedLock lock(mutex.get());
while (!ids.empty())
{
lock.relock(NULL);
Server->wait(100);
lock.relock(mutex.get());
}
}
void waitBefore(size_t id)
{
IScopedLock lock(mutex.get());
while (!ids.empty()
&& *ids.begin() < id)
{
lock.relock(NULL);
Server->wait(100);
lock.relock(mutex.get());
}
}
};
ServerDownloadThread(FileClient& fc, FileClientChunked* fc_chunked, const std::string& backuppath, const std::string& backuppath_hashes, const std::string& last_backuppath,
const std::string& last_backuppath_complete, bool hashed_transfer, bool save_incomplete_file, int clientid,
const std::string& clientname, const std::string& clientsubname,
bool use_tmpfiles, const std::string& tmpfile_path, const std::string& server_token, bool use_reflink, int backupid, bool r_incremental, IPipe* hashpipe_prepare, ClientMain* client_main,
int filesrv_protocol_version, int incremental_num, logid_t logid, bool with_hashes, const std::vector<std::string>& shares_without_snapshot,
bool with_sparse_hashing, server::FileMetadataDownloadThread* file_metadata_download, bool sc_failure_fatal, FilePathCorrections& filepath_corrections,
MaxFileId& max_file_id);
bool with_sparse_hashing, server::FileMetadataDownloadThread* file_metadata_download, bool sc_failure_fatal, size_t thread_idx, FilePathCorrections& filepath_corrections,
MaxFileId& max_file_id, ActiveDlIds& active_dls_ids);
~ServerDownloadThread();
@ -205,7 +250,9 @@ public:
bool shouldBackoff();
bool sleepQueue();
bool queueFull();
size_t queueSize();
size_t getNumEmbeddedMetadataFiles();
@ -308,4 +355,8 @@ private:
size_t tmpfile_num;
MaxFileId& max_file_id;
size_t thread_idx;
ActiveDlIds& active_dls_ids;
};

View File

@ -0,0 +1,258 @@
#include "ServerDownloadThreadGroup.h"
ServerDownloadThreadGroup::ServerDownloadThreadGroup(FileClient& fc, FileClientChunked* fc_chunked, const std::string& backuppath,
const std::string& backuppath_hashes, const std::string& last_backuppath, const std::string& last_backuppath_complete,
bool hashed_transfer, bool save_incomplete_file, int clientid, const std::string& clientname, const std::string& clientsubname,
bool use_tmpfiles, const std::string& tmpfile_path, const std::string& server_token, bool use_reflink, int backupid, bool r_incremental,
IPipe* hashpipe_prepare, ClientMain* client_main, int filesrv_protocol_version, int incremental_num, logid_t logid, bool with_hashes,
const std::vector<std::string>& shares_without_snapshot, bool with_sparse_hashing, server::FileMetadataDownloadThread* file_metadata_download,
bool sc_failure_fatal, size_t n_threads, ServerSettings* server_settings, bool intra_file_diffs, FilePathCorrections& filepath_corrections, MaxFileId& max_file_id)
{
for (size_t i = 0; i < n_threads; ++i)
{
ServerDlThread dl_thread;
FileClient* curr_fc;
FileClientChunked* curr_fc_chunked;
if (i == 0)
{
curr_fc = &fc;
curr_fc_chunked = fc_chunked;
}
else
{
curr_fc = new FileClient(false, client_main->getIdentity(),
client_main->getProtocolVersions().filesrv_protocol_version,
client_main->isOnInternetConnection(), client_main, use_tmpfiles ? NULL : client_main);
_u32 rc = client_main->getClientFilesrvConnection(curr_fc, server_settings, 60000);
if (rc != ERR_CONNECTED)
{
ServerLogger::Log(logid, "Failed to connect FileClient "+convert(i), LL_WARNING);
delete curr_fc;
continue;
}
else
{
curr_fc->setProgressLogCallback(client_main);
}
if (incremental_num > 0 &&
intra_file_diffs)
{
std::auto_ptr<FileClientChunked> new_fc;
if (client_main->getClientChunkedFilesrvConnection(new_fc, server_settings, client_main, 60000))
{
new_fc->setProgressLogCallback(client_main);
new_fc->setDestroyPipe(true);
if (new_fc->hasError())
{
ServerLogger::Log(logid, "Failed to connect chunked FileClient "+convert(i)+ " -1", LL_WARNING);
continue;
}
}
else
{
ServerLogger::Log(logid, "Failed to connect chunked FileClient " + convert(i) + " -2", LL_WARNING);
continue;
}
curr_fc_chunked = new_fc.release();
}
dl_thread.fc = curr_fc;
dl_thread.fc_chunked = curr_fc_chunked;
}
dl_thread.dl_thread = new ServerDownloadThread(*curr_fc, curr_fc_chunked,
backuppath, backuppath_hashes, last_backuppath, last_backuppath_complete,
hashed_transfer, save_incomplete_file, clientid, clientname, clientsubname,
use_tmpfiles, tmpfile_path, server_token, use_reflink, backupid, r_incremental,
hashpipe_prepare, client_main, filesrv_protocol_version, incremental_num, logid,
with_hashes, shares_without_snapshot, with_sparse_hashing, file_metadata_download,
sc_failure_fatal, i, filepath_corrections, max_file_id, active_dls_ids);
tickets.push_back(Server->getThreadPool()->execute(dl_thread.dl_thread, "fbackup load" + convert(i)));
}
}
ServerDownloadThreadGroup::~ServerDownloadThreadGroup()
{
for (size_t i = 0; i < dl_threads.size(); ++i)
{
ServerDlThread& dl_thread = dl_threads[i];
if (i > 0)
{
delete dl_thread.fc;
delete dl_thread.fc_chunked;
}
delete dl_thread.dl_thread;
}
}
void ServerDownloadThreadGroup::queueSkip()
{
for (size_t i = 0; i < dl_threads.size(); ++i)
{
dl_threads[i].dl_thread->queueSkip();
}
}
bool ServerDownloadThreadGroup::sleepQueue()
{
for (size_t i = 0; i < dl_threads.size(); ++i)
{
if (!dl_threads[i].dl_thread->queueFull())
return false;
}
Server->wait(100);
return true;
}
bool ServerDownloadThreadGroup::isOffline()
{
for (size_t i = 0; i < dl_threads.size(); ++i)
{
if (dl_threads[i].dl_thread->isOffline())
return true;
}
return false;
}
void ServerDownloadThreadGroup::addToQueueStartShadowcopy(const std::string& fn)
{
getMinQueued()->addToQueueStartShadowcopy(fn);
}
void ServerDownloadThreadGroup::addToQueueStopShadowcopy(const std::string& fn)
{
getMinQueued()->addToQueueStopShadowcopy(fn);
}
void ServerDownloadThreadGroup::queueStop()
{
for (size_t i = 0; i < dl_threads.size(); ++i)
{
dl_threads[i].dl_thread->queueStop();
}
}
void ServerDownloadThreadGroup::addToQueueFull(size_t id, const std::string& fn, const std::string& short_fn, const std::string& curr_path, const std::string& os_path, _i64 predicted_filesize, const FileMetadata& metadata, bool is_script, bool metadata_only, size_t folder_items, const std::string& sha_dig, bool at_front_postpone_quitstop, unsigned int p_script_random, std::string display_fn, bool write_metadata)
{
getMinQueued()->addToQueueFull(id, fn, short_fn, curr_path,
os_path, predicted_filesize, metadata, is_script, metadata_only, folder_items, sha_dig,
at_front_postpone_quitstop, p_script_random, display_fn, write_metadata);
}
void ServerDownloadThreadGroup::addToQueueChunked(size_t id, const std::string& fn, const std::string& short_fn, const std::string& curr_path, const std::string& os_path, _i64 predicted_filesize, const FileMetadata& metadata, bool is_script, const std::string& sha_dig, unsigned int p_script_random, std::string display_fn)
{
getMinQueued()->addToQueueChunked(id, fn, short_fn,
curr_path, os_path, predicted_filesize, metadata, is_script,
sha_dig, p_script_random, display_fn);
}
size_t ServerDownloadThreadGroup::getNumIssues()
{
size_t ret = 0;
for (size_t i = 0; i < dl_threads.size(); ++i)
{
ret += dl_threads[i].dl_thread->getNumIssues();
}
return ret;
}
bool ServerDownloadThreadGroup::getHasDiskError()
{
for (size_t i = 0; i < dl_threads.size(); ++i)
{
if (dl_threads[i].dl_thread->getHasDiskError())
return true;
}
return false;
}
bool ServerDownloadThreadGroup::hasTimeout()
{
for (size_t i = 0; i < dl_threads.size(); ++i)
{
if (dl_threads[i].dl_thread->hasTimeout())
return true;
}
return false;
}
bool ServerDownloadThreadGroup::shouldBackoff()
{
for (size_t i = 0; i < dl_threads.size(); ++i)
{
if (dl_threads[i].dl_thread->shouldBackoff())
return true;
}
return false;
}
size_t ServerDownloadThreadGroup::getNumEmbeddedMetadataFiles()
{
size_t ret = 0;
for (size_t i = 0; i < dl_threads.size(); ++i)
{
ret += dl_threads[i].dl_thread->getNumEmbeddedMetadataFiles();
}
return ret;
}
bool ServerDownloadThreadGroup::deleteTempFolder()
{
return dl_threads[0].dl_thread->deleteTempFolder();
}
bool ServerDownloadThreadGroup::isDownloadOk(size_t id)
{
for (size_t i = 0; i < dl_threads.size(); ++i)
{
if (!dl_threads[i].dl_thread->isDownloadOk(id))
return false;
}
return true;
}
bool ServerDownloadThreadGroup::isDownloadPartial(size_t id)
{
for (size_t i = 0; i < dl_threads.size(); ++i)
{
if (dl_threads[i].dl_thread->isDownloadPartial(id))
return true;
}
return false;
}
size_t ServerDownloadThreadGroup::getMaxOkId()
{
size_t max_ok_id = dl_threads[0].dl_thread->getMaxOkId();
for (size_t i = 1; i < dl_threads.size(); ++i)
{
max_ok_id = (std::max)(max_ok_id, dl_threads[i].dl_thread->getMaxOkId());
}
return max_ok_id;
}
bool ServerDownloadThreadGroup::join(int waitms)
{
return Server->getThreadPool()->waitFor(tickets, waitms);
}
ServerDownloadThread* ServerDownloadThreadGroup::getMinQueued()
{
ServerDownloadThread* ret = dl_threads[0].dl_thread;
size_t queue_size = dl_threads[0].dl_thread->queueSize();
for (size_t i = 1; i < dl_threads.size(); ++i)
{
size_t curr_queue_size = dl_threads[i].dl_thread->queueSize();
if (curr_queue_size < queue_size)
{
queue_size = curr_queue_size;
ret = dl_threads[i].dl_thread;
}
}
return ret;
}

View File

@ -0,0 +1,70 @@
#pragma once
#include "ServerDownloadThread.h"
class ServerDownloadThreadGroup
{
public:
ServerDownloadThreadGroup(FileClient& fc, FileClientChunked* fc_chunked, const std::string& backuppath, const std::string& backuppath_hashes, const std::string& last_backuppath, const std::string& last_backuppath_complete, bool hashed_transfer, bool save_incomplete_file, int clientid,
const std::string& clientname, const std::string& clientsubname,
bool use_tmpfiles, const std::string& tmpfile_path, const std::string& server_token, bool use_reflink, int backupid, bool r_incremental, IPipe* hashpipe_prepare, ClientMain* client_main,
int filesrv_protocol_version, int incremental_num, logid_t logid, bool with_hashes, const std::vector<std::string>& shares_without_snapshot,
bool with_sparse_hashing, server::FileMetadataDownloadThread* file_metadata_download, bool sc_failure_fatal, size_t n_threads, ServerSettings* server_settings, bool intra_file_diffs,
FilePathCorrections& filepath_corrections, MaxFileId& max_file_id);
~ServerDownloadThreadGroup();
void queueSkip();
bool sleepQueue();
bool isOffline();
void addToQueueStartShadowcopy(const std::string& fn);
void addToQueueStopShadowcopy(const std::string& fn);
void queueStop();
void addToQueueFull(size_t id, const std::string& fn, const std::string& short_fn, const std::string& curr_path, const std::string& os_path,
_i64 predicted_filesize, const FileMetadata& metadata, bool is_script, bool metadata_only, size_t folder_items, const std::string& sha_dig,
bool at_front_postpone_quitstop = false, unsigned int p_script_random = 0, std::string display_fn = std::string(), bool write_metadata = false);
void addToQueueChunked(size_t id, const std::string& fn, const std::string& short_fn, const std::string& curr_path,
const std::string& os_path, _i64 predicted_filesize, const FileMetadata& metadata, bool is_script, const std::string& sha_dig, unsigned int p_script_random = 0, std::string display_fn = std::string());
size_t getNumIssues();
bool getHasDiskError();
bool hasTimeout();
bool shouldBackoff();
size_t getNumEmbeddedMetadataFiles();
bool deleteTempFolder();
bool isDownloadOk(size_t id);
bool isDownloadPartial(size_t id);
size_t getMaxOkId();
bool join(int waitms);
private:
ServerDownloadThread* getMinQueued();
ServerDownloadThread::ActiveDlIds active_dls_ids;
struct ServerDlThread
{
ServerDownloadThread* dl_thread;
FileClient* fc;
FileClientChunked* fc_chunked;
};
std::vector<ServerDlThread> dl_threads;
std::vector<THREADPOOL_TICKET> tickets;
};

View File

@ -14,7 +14,7 @@
#include "../stringtools.h"
#include "server_settings.h"
#include "database.h"
#include "ServerDownloadThread.h"
#include "ServerDownloadThreadGroup.h"
#include "server_log.h"
#include "dao/ServerBackupDao.h"
#include "dao/ServerFilesDao.h"
@ -169,7 +169,7 @@ public:
if(server_download.get())
{
server_download->queueStop();
Server->getThreadPool()->waitFor(server_download_ticket);
server_download->join(-1);
}
if(has_fullpath_entryid_mapping_table)
@ -714,14 +714,12 @@ private:
void constructServerDownloadThread()
{
std::vector<std::string> shares_without_snapshot;
server_download.reset(new ServerDownloadThread(*fileclient.get(),
server_download.reset(new ServerDownloadThreadGroup(*fileclient.get(),
fileclient_chunked.get(), continuous_path,
continuous_hash_path, continuous_path, std::string(), hashed_transfer_full,
false, clientid, clientname, std::string(), use_tmpfiles, tmpfile_path, server_token,
use_reflink, backupid, true, hashpipe_prepare, client_main, client_main->getProtocolVersions().file_protocol_version,
0, logid, true, shares_without_snapshot, true, NULL, false, filepath_corrections, max_file_id));
server_download_ticket = Server->getThreadPool()->execute(server_download.get(), "backup download");
0, logid, true, shares_without_snapshot, true, NULL, false, 1, server_settings.get(), true, filepath_corrections, max_file_id));
}
bool execMod(SChange& change)
@ -940,8 +938,7 @@ private:
std::auto_ptr<ServerSettings> server_settings;
std::auto_ptr<ServerDownloadThread> server_download;
THREADPOOL_TICKET server_download_ticket;
std::auto_ptr<ServerDownloadThreadGroup> server_download;
std::deque<SQueueItem> dl_queue;

View File

@ -74,11 +74,6 @@ BackupServerHash::BackupServerHash(IPipe *pPipe, int pClientid, bool use_snapsho
BackupServerHash::~BackupServerHash(void)
{
if(pipe!=NULL)
{
Server->destroy(pipe);
}
delete fileindex;
}
@ -121,6 +116,7 @@ void BackupServerHash::operator()(void)
working=true;
if(data=="exit")
{
pipe->Write("exit");
deinitDatabase();
Server->Log("server_hash Thread finished - normal");
Server->destroyDatabases(Server->getThreadID());

View File

@ -64,7 +64,6 @@ BackupServerPrepareHash::BackupServerPrepareHash(IPipe *pPipe, IPipe *pOutput, i
BackupServerPrepareHash::~BackupServerPrepareHash(void)
{
Server->destroy(pipe);
}
void BackupServerPrepareHash::operator()(void)
@ -77,6 +76,7 @@ void BackupServerPrepareHash::operator()(void)
if(data=="exit")
{
output->Write("exit");
pipe->Write("exit");
Server->Log("server_prepare_hash Thread finished (exit)");
delete this;
return;

View File

@ -569,6 +569,15 @@ void ServerSettings::readSettingsDefault(ISettingsReader* settings_default,
settings->local_compress = true;
readBoolClientSetting(q_get_client_setting, "local_compress", &settings->local_compress, false);
settings->download_threads = 1;
readIntClientSetting(q_get_client_setting, "download_threads", &settings->download_threads, false);
settings->hash_threads = 1;
readIntClientSetting(q_get_client_setting, "hash_threads", &settings->hash_threads, false);
settings->client_hash_threads = 1;
readIntClientSetting(q_get_client_setting, "client_hash_threads", &settings->client_hash_threads, false);
settings->image_compress_threads = 0;
readIntClientSetting(q_get_client_setting, "image_compress_threads", &settings->image_compress_threads, false);
}
void ServerSettings::readSettingsClient(ISettingsReader* settings_client, IQuery* q_get_client_setting)
@ -687,6 +696,11 @@ void ServerSettings::readSettingsClient(ISettingsReader* settings_client, IQuery
readStringClientSetting(q_get_client_setting, "archive", std::string("&"), &settings->archive, false);
readStringClientSetting(q_get_client_setting, "client_settings_tray_access_pw", std::string(), &settings->client_settings_tray_access_pw, false);
readIntClientSetting(q_get_client_setting, "download_threads", &settings->download_threads, false);
readIntClientSetting(q_get_client_setting, "hash_threads", &settings->hash_threads, false);
readIntClientSetting(q_get_client_setting, "client_hash_threads", &settings->client_hash_threads, false);
readIntClientSetting(q_get_client_setting, "image_compress_threads", &settings->image_compress_threads, false);
}
void ServerSettings::readStringClientSetting(IQuery * q_get_client_setting, int clientid, const std::string & name, const std::string & merge_sep, std::string * output, bool allow_client_value)

View File

@ -146,6 +146,10 @@ struct SSettings
std::string client_settings_tray_access_pw;
bool local_encrypt;
bool local_compress;
int download_threads;
int hash_threads;
int client_hash_threads;
int image_compress_threads;
};
struct SLDAPSettings

View File

@ -272,6 +272,10 @@ JSON::Object getJSONClientSettings(IDatabase *db, int t_clientid)
SET_SETTING_STR(client_settings_tray_access_pw);
SET_SETTING_BOOL(local_encrypt);
SET_SETTING_BOOL(local_compress);
SET_SETTING_INT(download_threads);
SET_SETTING_INT(hash_threads);
SET_SETTING_INT(client_hash_threads);
SET_SETTING_INT(image_compress_threads);
#undef SET_SETTING
return ret;
}

View File

@ -253,6 +253,7 @@
<ClCompile Include="PhashLoad.cpp" />
<ClCompile Include="restore_client.cpp" />
<ClCompile Include="server.cpp" />
<ClCompile Include="ServerDownloadThreadGroup.cpp" />
<ClCompile Include="serverinterface\add_client.cpp" />
<ClCompile Include="serverinterface\backups.cpp" />
<ClCompile Include="serverinterface\create_zip.cpp" />
@ -375,6 +376,7 @@
<ClInclude Include="PhashLoad.h" />
<ClInclude Include="restore_client.h" />
<ClInclude Include="server.h" />
<ClInclude Include="ServerDownloadThreadGroup.h" />
<ClInclude Include="serverinterface\actions.h" />
<ClInclude Include="serverinterface\action_header.h" />
<ClInclude Include="serverinterface\backups.h" />

View File

@ -396,6 +396,9 @@
<ClCompile Include="..\urbackupcommon\WebSocketPipe.cpp">
<Filter>Quelldateien</Filter>
</ClCompile>
<ClCompile Include="ServerDownloadThreadGroup.cpp">
<Filter>Quelldateien</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="action_header.h">
@ -704,5 +707,8 @@
<ClInclude Include="..\urbackupcommon\WebSocketPipe.h">
<Filter>Headerdateien</Filter>
</ClInclude>
<ClInclude Include="ServerDownloadThreadGroup.h">
<Filter>Headerdateien</Filter>
</ClInclude>
</ItemGroup>
</Project>

File diff suppressed because one or more lines are too long

View File

@ -4167,7 +4167,11 @@ g.settings_list=[
"archive",
"client_settings_tray_access_pw",
"local_encrypt",
"local_compress"
"local_compress",
"download_threads",
"hash_threads",
"client_hash_threads",
"image_compress_threads"
];
g.general_settings_list=[
"backupfolder",

View File

@ -868,6 +868,34 @@
</div>
<div id="client_settings_tray_access_pw_sw" style="display: inline"></div>
</div>
<div class="form-group">
<label class="col-sm-4 control-label" for="download_threads">Beta: {tNumber of parallel file download threads per file backup}</label>
<div class="col-sm-6">
<label><input type="text" class="form-control" id="download_threads" value="{download_threads}"/></label>
</div>
<div id="download_threads_sw" style="display: inline"></div>
</div>
<div class="form-group">
<label class="col-sm-4 control-label" for="hash_threads">Beta: {tNumber of parallel server file hash threads per file backup}</label>
<div class="col-sm-6">
<label><input type="text" class="form-control" id="hash_threads" value="{hash_threads}"/></label>
</div>
<div id="hash_threads_sw" style="display: inline"></div>
</div>
<div class="form-group">
<label class="col-sm-4 control-label" for="client_hash_threads">Beta: {tNumber of parallel client file hash threads per file backup}</label>
<div class="col-sm-6">
<label><input type="text" class="form-control" id="client_hash_threads" value="{client_hash_threads}"/></label>
</div>
<div id="client_hash_threads_sw" style="display: inline"></div>
</div>
<div class="form-group">
<label class="col-sm-4 control-label" for="image_compress_threads">Beta: {tNumber of threads to use for VHDZ compression (0: auto)}</label>
<div class="col-sm-6">
<label><input type="text" class="form-control" id="image_compress_threads" value="{image_compress_threads}"/></label>
</div>
<div id="image_compress_threads_sw" style="display: inline"></div>
</div>
</form>
</div>
</div>