mirror of
https://github.com/uroni/urbackup_backend.git
synced 2025-10-26 11:36:50 +00:00
554 lines
13 KiB
C++
554 lines
13 KiB
C++
/*************************************************************************
|
|
* UrBackup - Client/Server backup system
|
|
* Copyright (C) 2011-2016 Martin Raiber
|
|
*
|
|
* This program is free software: you can redistribute it and/or modify
|
|
* it under the terms of the GNU Affero General Public License as published by
|
|
* the Free Software Foundation, either version 3 of the License, or
|
|
* (at your option) any later version.
|
|
*
|
|
* This program is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
* GNU Affero General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU Affero General Public License
|
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
**************************************************************************/
|
|
|
|
#ifndef NO_ZSTD_COMPRESSION
|
|
|
|
#include "CompressedPipeZstd.h"
|
|
#include "../Interface/Server.h"
|
|
#include "../Interface/Mutex.h"
|
|
#include <limits.h>
|
|
#include <memory.h>
|
|
#include <string.h>
|
|
#include "../stringtools.h"
|
|
#include <assert.h>
|
|
#include <stdexcept>
|
|
#include <assert.h>
|
|
#include "InternetServicePipe2.h"
|
|
#include "os_functions.h"
|
|
|
|
#define VLOG(x)
|
|
|
|
|
|
const size_t max_send_size=20000;
|
|
const size_t output_incr_size=8192;
|
|
const size_t output_max_size=32*1024;
|
|
|
|
CompressedPipeZstd::CompressedPipeZstd(IPipe *cs, int compression_level, int threads)
|
|
: cs(cs), has_error(false),
|
|
uncompressed_sent_bytes(0), uncompressed_received_bytes(0), sent_flushes(0),
|
|
input_buffer_size(0), read_mutex(Server->createMutex()), write_mutex(Server->createMutex()),
|
|
last_send_time(Server->getTimeMS()),
|
|
inf_stream(ZSTD_createDStream()),
|
|
def_stream(ZSTD_createCCtx())
|
|
{
|
|
comp_buffer.resize(8192);
|
|
input_buffer.resize(16384);
|
|
destroy_cs=false;
|
|
|
|
if(inf_stream==NULL)
|
|
{
|
|
throw std::runtime_error("Error initializing compression stream");
|
|
}
|
|
if(def_stream==NULL)
|
|
{
|
|
throw std::runtime_error("Error initializing decompression stream");
|
|
}
|
|
|
|
size_t err = ZSTD_CCtx_setParameter(def_stream, ZSTD_c_compressionLevel, compression_level);
|
|
|
|
if (ZSTD_isError(err))
|
|
{
|
|
throw std::runtime_error(std::string("Error setting zstd compression level. ") + ZSTD_getErrorName(err));
|
|
}
|
|
|
|
/*if (threads == -1)
|
|
{
|
|
threads = static_cast<int>(os_get_num_cpus());
|
|
}
|
|
|
|
if (threads > 1)
|
|
{
|
|
size_t err = ZSTD_CCtx_setParameter(def_stream, ZSTD_c_nbWorkers, threads);
|
|
|
|
if (ZSTD_isError(err))
|
|
{
|
|
throw std::runtime_error(std::string("Error setting zstd workers. ") + ZSTD_getErrorName(err));
|
|
}
|
|
}*/
|
|
}
|
|
|
|
CompressedPipeZstd::~CompressedPipeZstd(void)
|
|
{
|
|
ZSTD_freeDStream(inf_stream);
|
|
ZSTD_freeCCtx(def_stream);
|
|
|
|
if(destroy_cs)
|
|
{
|
|
Server->destroy(cs);
|
|
}
|
|
}
|
|
|
|
size_t CompressedPipeZstd::Read(char *buffer, size_t bsize, int timeoutms)
|
|
{
|
|
IScopedLock lock(read_mutex.get());
|
|
VLOG(Server->Log("Read bsize=" + convert(bsize) + " timeoutms=" + convert(timeoutms)+" input_buffer_size="+convert(input_buffer_size), LL_DEBUG));
|
|
|
|
if(input_buffer_size>0)
|
|
{
|
|
size_t rc = ProcessToBuffer(buffer, bsize, true);
|
|
if(rc>0)
|
|
{
|
|
return rc;
|
|
}
|
|
else if(input_buffer_size==input_buffer.size())
|
|
{
|
|
input_buffer.resize(input_buffer.size()+output_incr_size);
|
|
}
|
|
}
|
|
|
|
if(timeoutms==0)
|
|
{
|
|
size_t rc=cs->Read(input_buffer.data()+input_buffer_size, input_buffer.size()-input_buffer_size, timeoutms);
|
|
if(rc==0)
|
|
return 0;
|
|
|
|
input_buffer_size+=rc;
|
|
return ProcessToBuffer(buffer, bsize, false);
|
|
}
|
|
else if(timeoutms==-1)
|
|
{
|
|
size_t rc;
|
|
do
|
|
{
|
|
rc=cs->Read(input_buffer.data()+input_buffer_size, input_buffer.size()-input_buffer_size, timeoutms);
|
|
if(rc==0)
|
|
return 0;
|
|
if(has_error)
|
|
{
|
|
return 0;
|
|
}
|
|
|
|
input_buffer_size += rc;
|
|
rc = ProcessToBuffer(buffer, bsize, false);
|
|
}
|
|
while(rc==0);
|
|
return rc;
|
|
}
|
|
|
|
int64 starttime=Server->getTimeMS();
|
|
size_t rc=0;
|
|
do
|
|
{
|
|
int left=timeoutms-static_cast<int>(Server->getTimeMS()-starttime);
|
|
if (left < 0)
|
|
{
|
|
break;
|
|
}
|
|
|
|
rc=cs->Read(input_buffer.data()+input_buffer_size, input_buffer.size()-input_buffer_size, left);
|
|
if(rc==0)
|
|
return 0;
|
|
if(has_error)
|
|
{
|
|
return 0;
|
|
}
|
|
input_buffer_size += rc;
|
|
rc = ProcessToBuffer(buffer, bsize, false);
|
|
}
|
|
while(rc==0 && Server->getTimeMS()-starttime<static_cast<int64>(timeoutms));
|
|
|
|
return rc;
|
|
}
|
|
|
|
size_t CompressedPipeZstd::ProcessToBuffer(char *buffer, size_t bsize, bool fromLast)
|
|
{
|
|
VLOG(Server->Log("bsize=" + convert(bsize) + " fromLast=" + convert(fromLast), LL_DEBUG));
|
|
bool set_out=false;
|
|
if(fromLast)
|
|
{
|
|
set_out=true;
|
|
|
|
ZSTD_outBuffer outBuffer;
|
|
outBuffer.dst = buffer;
|
|
outBuffer.size = bsize;
|
|
outBuffer.pos = 0;
|
|
|
|
VLOG(Server->Log("ZSTD_decompressStream(1) avail_in=" + convert(inf_in_last.size - inf_in_last.pos) + " avail_out=" + convert(bsize), LL_DEBUG));
|
|
|
|
size_t rc = ZSTD_decompressStream(inf_stream, &outBuffer, &inf_in_last);
|
|
|
|
assert(bsize >= outBuffer.size - outBuffer.pos);
|
|
size_t used = outBuffer.pos;
|
|
uncompressed_received_bytes+=used;
|
|
|
|
VLOG(Server->Log("rc=" + convert(rc) + " used=" + convert(used) + " avail_in = " + convert(inf_in_last.size - inf_in_last.pos) + " avail_out = " + convert(outBuffer.size - outBuffer.pos), LL_DEBUG));
|
|
|
|
if(ZSTD_isError(rc))
|
|
{
|
|
Server->Log("Error decompressing stream(1): " + convert(rc)
|
|
+ " Err: "+ZSTD_getErrorName(rc), LL_ERROR);
|
|
has_error=true;
|
|
return 0;
|
|
}
|
|
|
|
if(inf_in_last.size==inf_in_last.pos && outBuffer.pos!= outBuffer.size)
|
|
{
|
|
input_buffer_size=0;
|
|
}
|
|
|
|
return used;
|
|
}
|
|
|
|
inf_in_last.src = input_buffer.data();
|
|
inf_in_last.pos = 0;
|
|
inf_in_last.size = input_buffer_size;
|
|
|
|
ZSTD_outBuffer outBuffer;
|
|
outBuffer.dst = buffer;
|
|
outBuffer.size = bsize;
|
|
outBuffer.pos = 0;
|
|
|
|
VLOG(Server->Log("ZSTD_decompressStream(2) avail_in=" + convert(input_buffer_size) + " avail_out=" + convert(bsize), LL_DEBUG));
|
|
size_t rc = ZSTD_decompressStream(inf_stream, &outBuffer, &inf_in_last);
|
|
|
|
size_t used = outBuffer.pos;
|
|
VLOG(Server->Log("rc=" + convert(rc) + " used=" + convert(used)+" avail_in = " + convert(inf_in_last.size - inf_in_last.pos) + " avail_out = " + convert(outBuffer.size - outBuffer.pos), LL_DEBUG));
|
|
uncompressed_received_bytes+=used;
|
|
|
|
if (ZSTD_isError(rc))
|
|
{
|
|
Server->Log("Error decompressing stream(2): "+convert(rc)
|
|
+ " Err: " + ZSTD_getErrorName(rc), LL_ERROR);
|
|
has_error=true;
|
|
return 0;
|
|
}
|
|
|
|
if(inf_in_last.size == inf_in_last.pos && outBuffer.pos != outBuffer.size)
|
|
{
|
|
input_buffer_size=0;
|
|
}
|
|
|
|
return used;
|
|
}
|
|
|
|
|
|
void CompressedPipeZstd::ProcessToString(std::string* ret, bool fromLast )
|
|
{
|
|
size_t data_pos = 0;
|
|
do
|
|
{
|
|
if(data_pos+output_incr_size>ret->size())
|
|
{
|
|
ret->resize(ret->size()+output_incr_size);
|
|
}
|
|
|
|
size_t avail = ret->size()-data_pos;
|
|
size_t used = ProcessToBuffer(&(*ret)[data_pos], avail, fromLast);
|
|
|
|
if(used<avail)
|
|
{
|
|
ret->resize(ret->size()-(avail-used));
|
|
}
|
|
else if(ret->size()>output_max_size)
|
|
{
|
|
return;
|
|
}
|
|
else
|
|
{
|
|
data_pos+=avail;
|
|
}
|
|
|
|
fromLast = true;
|
|
|
|
} while (input_buffer_size!=0);
|
|
}
|
|
|
|
bool CompressedPipeZstd::Write(const char *buffer, size_t bsize, int timeoutms, bool flush)
|
|
{
|
|
IScopedLock lock(write_mutex.get());
|
|
|
|
assert(buffer != NULL || bsize == 0);
|
|
const char* ptr=buffer;
|
|
size_t cbsize=bsize;
|
|
int64 starttime = Server->getTimeMS();
|
|
do
|
|
{
|
|
cbsize=(std::min)(max_send_size, bsize);
|
|
|
|
bsize-=cbsize;
|
|
uncompressed_sent_bytes+=cbsize;
|
|
|
|
bool has_next = bsize>0;
|
|
bool curr_flush = has_next ? false : flush;
|
|
|
|
if (!curr_flush
|
|
&& Server->getTimeMS() - last_send_time > 1000)
|
|
{
|
|
curr_flush = true;
|
|
}
|
|
|
|
if(curr_flush)
|
|
{
|
|
++sent_flushes;
|
|
}
|
|
|
|
|
|
ZSTD_inBuffer inbuf;
|
|
inbuf.src = ptr;
|
|
inbuf.pos = 0;
|
|
inbuf.size = cbsize;
|
|
|
|
ZSTD_outBuffer outbuf;
|
|
size_t rc;
|
|
|
|
do
|
|
{
|
|
outbuf.dst = comp_buffer.data();
|
|
outbuf.pos = 0;
|
|
outbuf.size = comp_buffer.size();
|
|
|
|
VLOG(Server->Log("ZSTD_compressStream2 avail_in=" + convert(inbuf.size-inbuf.pos) + " avail_out=" + convert(outbuf.size)+" flush="+convert(curr_flush), LL_DEBUG));
|
|
rc = ZSTD_compressStream2(def_stream, &outbuf, &inbuf, curr_flush ? ZSTD_e_flush : ZSTD_e_continue);
|
|
|
|
if(ZSTD_isError(rc))
|
|
{
|
|
Server->Log("Error compressing stream: "+convert(rc)
|
|
+ " Err: " + ZSTD_getErrorName(rc), LL_ERROR);
|
|
has_error=true;
|
|
return false;
|
|
}
|
|
|
|
assert(comp_buffer.size() >= outbuf.size - outbuf.pos);
|
|
|
|
size_t used = outbuf.pos;
|
|
|
|
VLOG(Server->Log("rc="+convert(rc)+" used="+convert(used)+" avail_in=" + convert(inbuf.size - inbuf.pos) + " avail_out=" + convert(outbuf.size - outbuf.pos), LL_DEBUG));
|
|
|
|
int curr_timeout = timeoutms;
|
|
|
|
if(curr_timeout>0)
|
|
{
|
|
int64 time_elapsed = Server->getTimeMS()-starttime;
|
|
if(time_elapsed>curr_timeout)
|
|
{
|
|
VLOG(Server->Log("Timeout after compression", LL_DEBUG));
|
|
return false;
|
|
}
|
|
else
|
|
{
|
|
curr_timeout-=static_cast<int>(time_elapsed);
|
|
}
|
|
}
|
|
|
|
if(used>0)
|
|
{
|
|
last_send_time = Server->getTimeMS();
|
|
|
|
bool b=cs->Write(comp_buffer.data(), used, curr_timeout, curr_flush);
|
|
if(!b)
|
|
return false;
|
|
}
|
|
else if(!has_next && flush)
|
|
{
|
|
return cs->Flush(curr_timeout);
|
|
}
|
|
|
|
} while(outbuf.pos==outbuf.size || (curr_flush && rc!=0) );
|
|
|
|
ptr+=cbsize;
|
|
|
|
} while(bsize>0);
|
|
|
|
return true;
|
|
}
|
|
|
|
size_t CompressedPipeZstd::Read(std::string *ret, int timeoutms)
|
|
{
|
|
IScopedLock lock(read_mutex.get());
|
|
|
|
if(input_buffer_size>0)
|
|
{
|
|
ProcessToString(ret, true);
|
|
if(!ret->empty())
|
|
{
|
|
return ret->size();
|
|
}
|
|
else if(input_buffer_size==input_buffer.size())
|
|
{
|
|
input_buffer.resize(input_buffer.size()+output_incr_size);
|
|
}
|
|
}
|
|
|
|
if(timeoutms==0)
|
|
{
|
|
size_t rc=cs->Read(input_buffer.data()+input_buffer_size, input_buffer.size()-input_buffer_size, timeoutms);
|
|
if(rc==0)
|
|
return 0;
|
|
|
|
if(has_error)
|
|
{
|
|
return 0;
|
|
}
|
|
input_buffer_size+=rc;
|
|
ProcessToString(ret, false);
|
|
return ret->size();
|
|
}
|
|
else if(timeoutms==-1)
|
|
{
|
|
size_t rc;
|
|
do
|
|
{
|
|
rc=cs->Read(input_buffer.data()+input_buffer_size, input_buffer.size()-input_buffer_size, timeoutms);
|
|
if(rc==0)
|
|
return 0;
|
|
|
|
if(has_error)
|
|
{
|
|
return 0;
|
|
}
|
|
|
|
input_buffer_size+=rc;
|
|
ProcessToString(ret, false);
|
|
rc=ret->size();
|
|
}
|
|
while(rc==0);
|
|
return rc;
|
|
}
|
|
|
|
int64 starttime=Server->getTimeMS();
|
|
size_t rc;
|
|
do
|
|
{
|
|
int left=timeoutms-static_cast<int>(Server->getTimeMS()-starttime);
|
|
|
|
rc=cs->Read(input_buffer.data()+input_buffer_size, input_buffer.size()-input_buffer_size, left);
|
|
if(rc==0)
|
|
return 0;
|
|
|
|
if(has_error)
|
|
{
|
|
return 0;
|
|
}
|
|
input_buffer_size+=rc;
|
|
ProcessToString(ret, false);
|
|
rc=ret->size();
|
|
}
|
|
while(rc==0 && Server->getTimeMS()-starttime<static_cast<int64>(timeoutms));
|
|
|
|
return rc;
|
|
}
|
|
|
|
bool CompressedPipeZstd::Write(const std::string &str, int timeoutms, bool flush)
|
|
{
|
|
return Write(str.c_str(), str.size(), timeoutms, flush);
|
|
}
|
|
|
|
/**
|
|
* @param timeoutms -1 for blocking >=0 to block only for x ms. Default: nonblocking
|
|
*/
|
|
bool CompressedPipeZstd::isWritable(int timeoutms)
|
|
{
|
|
return cs->isWritable(timeoutms);
|
|
}
|
|
|
|
bool CompressedPipeZstd::isReadable(int timeoutms)
|
|
{
|
|
if(input_buffer_size>0)
|
|
return true;
|
|
else
|
|
return cs->isReadable(timeoutms);
|
|
}
|
|
|
|
bool CompressedPipeZstd::hasError(void)
|
|
{
|
|
return cs->hasError() || has_error;
|
|
}
|
|
|
|
void CompressedPipeZstd::shutdown(void)
|
|
{
|
|
cs->shutdown();
|
|
}
|
|
|
|
size_t CompressedPipeZstd::getNumWaiters() {
|
|
return cs->getNumWaiters();
|
|
}
|
|
|
|
size_t CompressedPipeZstd::getNumElements(void)
|
|
{
|
|
return cs->getNumElements();
|
|
}
|
|
|
|
void CompressedPipeZstd::destroyBackendPipeOnDelete(bool b)
|
|
{
|
|
destroy_cs=b;
|
|
}
|
|
|
|
IPipe *CompressedPipeZstd::getRealPipe(void)
|
|
{
|
|
return cs;
|
|
}
|
|
|
|
void CompressedPipeZstd::addThrottler(IPipeThrottler *throttler)
|
|
{
|
|
cs->addThrottler(throttler);
|
|
}
|
|
|
|
void CompressedPipeZstd::addOutgoingThrottler(IPipeThrottler *throttler)
|
|
{
|
|
cs->addOutgoingThrottler(throttler);
|
|
}
|
|
|
|
void CompressedPipeZstd::addIncomingThrottler(IPipeThrottler *throttler)
|
|
{
|
|
cs->addIncomingThrottler(throttler);
|
|
}
|
|
|
|
_i64 CompressedPipeZstd::getTransferedBytes(void)
|
|
{
|
|
return cs->getTransferedBytes();
|
|
}
|
|
|
|
void CompressedPipeZstd::resetTransferedBytes(void)
|
|
{
|
|
cs->resetTransferedBytes();
|
|
}
|
|
|
|
bool CompressedPipeZstd::Flush( int timeoutms/*=-1 */ )
|
|
{
|
|
return Write(NULL, 0, timeoutms, true);
|
|
}
|
|
|
|
int64 CompressedPipeZstd::getUncompressedReceivedBytes()
|
|
{
|
|
return uncompressed_received_bytes;
|
|
}
|
|
|
|
int64 CompressedPipeZstd::getUncompressedSentBytes()
|
|
{
|
|
return uncompressed_sent_bytes;
|
|
}
|
|
|
|
int64 CompressedPipeZstd::getSentFlushes()
|
|
{
|
|
return sent_flushes;
|
|
}
|
|
|
|
_i64 CompressedPipeZstd::getRealTransferredBytes()
|
|
{
|
|
int64 encryption_overhead=0;
|
|
InternetServicePipe2* isp2 = dynamic_cast<InternetServicePipe2*>(getRealPipe());
|
|
if(isp2!=NULL)
|
|
{
|
|
encryption_overhead=isp2->getEncryptionOverheadBytes();
|
|
Server->Log("Encryption overhead: "+PrettyPrintBytes(encryption_overhead));
|
|
}
|
|
|
|
return getUncompressedSentBytes()+getUncompressedReceivedBytes()-encryption_overhead;
|
|
}
|
|
|
|
#endif //NO_ZSTD_COMPRESSION
|