urbackup_backend/urbackupcommon/CompressedPipe2.cpp

527 lines
13 KiB
C++

/*************************************************************************
* UrBackup - Client/Server backup system
* Copyright (C) 2011-2016 Martin Raiber
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
**************************************************************************/
#include "CompressedPipe2.h"
#include "../Interface/Server.h"
#include "../Interface/Mutex.h"
#include <limits.h>
#include <memory.h>
#include <string.h>
#include "../stringtools.h"
#include <assert.h>
#include <stdexcept>
#include <assert.h>
#include "InternetServicePipe2.h"
#define VLOG(x)
const size_t max_send_size=20000;
const size_t output_incr_size=8192;
const size_t output_max_size=32*1024;
CompressedPipe2::CompressedPipe2(IPipe *cs, int compression_level)
: cs(cs), has_error(false),
uncompressed_sent_bytes(0), uncompressed_received_bytes(0), sent_flushes(0),
input_buffer_size(0), read_mutex(Server->createMutex()), write_mutex(Server->createMutex()),
last_send_time(Server->getTimeMS())
{
comp_buffer.resize(4096);
input_buffer.resize(16384);
destroy_cs=false;
memset(&inf_stream, 0, sizeof(z_stream));
memset(&def_stream, 0, sizeof(z_stream));
if(deflateInit(&def_stream, compression_level)!=Z_OK)
{
throw std::runtime_error("Error initializing compression stream");
}
if(inflateInit(&inf_stream)!=Z_OK)
{
throw std::runtime_error("Error initializing decompression stream");
}
}
CompressedPipe2::~CompressedPipe2(void)
{
deflateEnd(&def_stream);
inflateEnd(&inf_stream);
if(destroy_cs)
{
Server->destroy(cs);
}
}
size_t CompressedPipe2::Read(char *buffer, size_t bsize, int timeoutms)
{
IScopedLock lock(read_mutex.get());
VLOG(Server->Log("Read bsize=" + convert(bsize) + " timeoutms=" + convert(timeoutms)+" input_buffer_size="+convert(input_buffer_size), LL_DEBUG));
if(input_buffer_size>0)
{
size_t rc = ProcessToBuffer(buffer, bsize, true);
if(rc>0)
{
return rc;
}
else if(input_buffer_size==input_buffer.size())
{
input_buffer.resize(input_buffer.size()+output_incr_size);
}
}
if(timeoutms==0)
{
size_t rc=cs->Read(input_buffer.data()+input_buffer_size, input_buffer.size()-input_buffer_size, timeoutms);
if(rc==0)
return 0;
input_buffer_size+=rc;
return ProcessToBuffer(buffer, bsize, false);
}
else if(timeoutms==-1)
{
size_t rc;
do
{
rc=cs->Read(input_buffer.data()+input_buffer_size, input_buffer.size()-input_buffer_size, timeoutms);
if(rc==0)
return 0;
if(has_error)
{
return 0;
}
input_buffer_size += rc;
rc = ProcessToBuffer(buffer, bsize, false);
}
while(rc==0);
return rc;
}
int64 starttime=Server->getTimeMS();
size_t rc=0;
do
{
int left=timeoutms-static_cast<int>(Server->getTimeMS()-starttime);
if (left < 0)
{
break;
}
rc=cs->Read(input_buffer.data()+input_buffer_size, input_buffer.size()-input_buffer_size, left);
if(rc==0)
return 0;
if(has_error)
{
return 0;
}
input_buffer_size += rc;
rc = ProcessToBuffer(buffer, bsize, false);
}
while(rc==0 && Server->getTimeMS()-starttime<static_cast<int64>(timeoutms));
return rc;
}
size_t CompressedPipe2::ProcessToBuffer(char *buffer, size_t bsize, bool fromLast)
{
VLOG(Server->Log("bsize=" + convert(bsize) + " fromLast=" + convert(fromLast), LL_DEBUG));
bool set_out=false;
if(fromLast)
{
inf_stream.next_out=reinterpret_cast<unsigned char*>(buffer);
inf_stream.avail_out=static_cast<unsigned int>(bsize);
set_out=true;
VLOG(Server->Log("inflate(1) avail_in=" + convert(inf_stream.avail_in) + " avail_out=" + convert(inf_stream.avail_out), LL_DEBUG));
int rc = inflate(&inf_stream, Z_SYNC_FLUSH);
assert(bsize >= inf_stream.avail_out);
size_t used = bsize - inf_stream.avail_out;
uncompressed_received_bytes+=used;
VLOG(Server->Log("rc=" + convert(rc) + " used=" + convert(used) + " avail_in = " + convert(inf_stream.avail_in) + " avail_out = " + convert(inf_stream.avail_out), LL_DEBUG));
if(rc!=Z_OK && rc!=Z_STREAM_END && rc!=Z_BUF_ERROR /*Needs more input*/)
{
Server->Log("Error decompressing stream(1): " + convert(rc)
+ (inf_stream.msg != NULL ? (" Err: " + std::string(inf_stream.msg)) : ""), LL_ERROR);
has_error=true;
return 0;
}
if(inf_stream.avail_in==0 && inf_stream.avail_out!=0)
{
input_buffer_size=0;
}
else if(inf_stream.avail_in==0)
{
inf_stream.next_in=NULL;
}
return used;
}
inf_stream.avail_in = static_cast<unsigned int>(input_buffer_size);
inf_stream.next_in = reinterpret_cast<Bytef*>(input_buffer.data());
if(!set_out)
{
inf_stream.next_out=reinterpret_cast<unsigned char*>(buffer);
inf_stream.avail_out=static_cast<unsigned int>(bsize);
}
VLOG(Server->Log("inflate(2) avail_in=" + convert(inf_stream.avail_in) + " avail_out=" + convert(inf_stream.avail_out), LL_DEBUG));
int rc = inflate(&inf_stream, Z_SYNC_FLUSH);
size_t used = bsize - inf_stream.avail_out;
VLOG(Server->Log("rc=" + convert(rc) + " used=" + convert(used)+" avail_in = " + convert(inf_stream.avail_in) + " avail_out = " + convert(inf_stream.avail_out), LL_DEBUG));
uncompressed_received_bytes+=used;
if(rc!=Z_OK && rc!=Z_STREAM_END && rc != Z_BUF_ERROR /*Needs more input*/)
{
Server->Log("Error decompressing stream(2): "+convert(rc)
+ (inf_stream.msg != NULL ? (" Err: " + std::string(inf_stream.msg)) : ""), LL_ERROR);
has_error=true;
return 0;
}
if(inf_stream.avail_in==0 && inf_stream.avail_out!=0)
{
input_buffer_size=0;
}
else if(inf_stream.avail_in==0)
{
inf_stream.next_in=NULL;
}
return used;
}
void CompressedPipe2::ProcessToString(std::string* ret, bool fromLast )
{
size_t data_pos = 0;
do
{
if(data_pos+output_incr_size>ret->size())
{
ret->resize(ret->size()+output_incr_size);
}
size_t avail = ret->size()-data_pos;
ProcessToBuffer(&(*ret)[data_pos], avail, fromLast);
if(inf_stream.avail_out!=0)
{
ret->resize(ret->size()-inf_stream.avail_out);
}
else if(ret->size()>output_max_size)
{
return;
}
else
{
data_pos+=avail;
}
fromLast = true;
} while (input_buffer_size!=0);
}
bool CompressedPipe2::Write(const char *buffer, size_t bsize, int timeoutms, bool flush)
{
IScopedLock lock(write_mutex.get());
assert(buffer != NULL || bsize == 0);
const char* ptr=buffer;
size_t cbsize=bsize;
int64 starttime = Server->getTimeMS();
do
{
cbsize=(std::min)(max_send_size, bsize);
bsize-=cbsize;
uncompressed_sent_bytes+=cbsize;
bool has_next = bsize>0;
bool curr_flush = has_next ? false : flush;
if (!curr_flush
&& Server->getTimeMS() - last_send_time > 1000)
{
curr_flush = true;
}
if(curr_flush)
{
++sent_flushes;
}
def_stream.avail_in = static_cast<unsigned int>(cbsize);
def_stream.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(ptr));
do
{
def_stream.avail_out = static_cast<unsigned int>(comp_buffer.size());
def_stream.next_out = reinterpret_cast<unsigned char*>(comp_buffer.data());
VLOG(Server->Log("deflate avail_in=" + convert(def_stream.avail_in) + " avail_out=" + convert(def_stream.avail_out)+" flush="+convert(curr_flush), LL_DEBUG));
int rc = deflate(&def_stream, curr_flush ? Z_SYNC_FLUSH : Z_NO_FLUSH);
if(rc!=Z_OK && rc!=Z_STREAM_END && rc!=Z_BUF_ERROR)
{
Server->Log("Error compressing stream: "+convert(rc)
+ (def_stream.msg != NULL ? (" Err: " + std::string(def_stream.msg)) : ""), LL_ERROR);
has_error=true;
return false;
}
assert(comp_buffer.size() >= def_stream.avail_out);
size_t used = comp_buffer.size() - def_stream.avail_out;
VLOG(Server->Log("rc="+convert(rc)+" used="+convert(used)+" avail_in=" + convert(def_stream.avail_in) + " avail_out=" + convert(def_stream.avail_out), LL_DEBUG));
int curr_timeout = timeoutms;
if(curr_timeout>0)
{
int64 time_elapsed = Server->getTimeMS()-starttime;
if(time_elapsed>curr_timeout)
{
VLOG(Server->Log("Timeout after compression", LL_DEBUG));
return false;
}
else
{
curr_timeout-=static_cast<int>(time_elapsed);
}
}
if(used>0)
{
last_send_time = Server->getTimeMS();
bool b=cs->Write(comp_buffer.data(), used, curr_timeout, curr_flush);
if(!b)
return false;
}
else if(!has_next && flush)
{
return cs->Flush(curr_timeout);
}
} while(def_stream.avail_out==0);
ptr+=cbsize;
} while(bsize>0);
return true;
}
size_t CompressedPipe2::Read(std::string *ret, int timeoutms)
{
IScopedLock lock(read_mutex.get());
if(input_buffer_size>0)
{
ProcessToString(ret, true);
if(!ret->empty())
{
return ret->size();
}
else if(input_buffer_size==input_buffer.size())
{
input_buffer.resize(input_buffer.size()+output_incr_size);
}
}
if(timeoutms==0)
{
size_t rc=cs->Read(input_buffer.data()+input_buffer_size, input_buffer.size()-input_buffer_size, timeoutms);
if(rc==0)
return 0;
if(has_error)
{
return 0;
}
input_buffer_size+=rc;
ProcessToString(ret, false);
return ret->size();
}
else if(timeoutms==-1)
{
size_t rc;
do
{
rc=cs->Read(input_buffer.data()+input_buffer_size, input_buffer.size()-input_buffer_size, timeoutms);
if(rc==0)
return 0;
if(has_error)
{
return 0;
}
input_buffer_size+=rc;
ProcessToString(ret, false);
rc=ret->size();
}
while(rc==0);
return rc;
}
int64 starttime=Server->getTimeMS();
size_t rc;
do
{
int left=timeoutms-static_cast<int>(Server->getTimeMS()-starttime);
rc=cs->Read(input_buffer.data()+input_buffer_size, input_buffer.size()-input_buffer_size, left);
if(rc==0)
return 0;
if(has_error)
{
return 0;
}
input_buffer_size+=rc;
ProcessToString(ret, false);
rc=ret->size();
}
while(rc==0 && Server->getTimeMS()-starttime<static_cast<int64>(timeoutms));
return rc;
}
bool CompressedPipe2::Write(const std::string &str, int timeoutms, bool flush)
{
return Write(str.c_str(), str.size(), timeoutms, flush);
}
/**
* @param timeoutms -1 for blocking >=0 to block only for x ms. Default: nonblocking
*/
bool CompressedPipe2::isWritable(int timeoutms)
{
return cs->isWritable(timeoutms);
}
bool CompressedPipe2::isReadable(int timeoutms)
{
if(input_buffer_size>0)
return true;
else
return cs->isReadable(timeoutms);
}
bool CompressedPipe2::hasError(void)
{
return cs->hasError() || has_error;
}
void CompressedPipe2::shutdown(void)
{
cs->shutdown();
}
size_t CompressedPipe2::getNumWaiters() {
return cs->getNumWaiters();
}
size_t CompressedPipe2::getNumElements(void)
{
return cs->getNumElements();
}
void CompressedPipe2::destroyBackendPipeOnDelete(bool b)
{
destroy_cs=b;
}
IPipe *CompressedPipe2::getRealPipe(void)
{
return cs;
}
void CompressedPipe2::addThrottler(IPipeThrottler *throttler)
{
cs->addThrottler(throttler);
}
void CompressedPipe2::addOutgoingThrottler(IPipeThrottler *throttler)
{
cs->addOutgoingThrottler(throttler);
}
void CompressedPipe2::addIncomingThrottler(IPipeThrottler *throttler)
{
cs->addIncomingThrottler(throttler);
}
_i64 CompressedPipe2::getTransferedBytes(void)
{
return cs->getTransferedBytes();
}
void CompressedPipe2::resetTransferedBytes(void)
{
cs->resetTransferedBytes();
}
bool CompressedPipe2::Flush( int timeoutms/*=-1 */ )
{
return Write(NULL, 0, timeoutms, true);
}
int64 CompressedPipe2::getUncompressedReceivedBytes()
{
return uncompressed_received_bytes;
}
int64 CompressedPipe2::getUncompressedSentBytes()
{
return uncompressed_sent_bytes;
}
int64 CompressedPipe2::getSentFlushes()
{
return sent_flushes;
}
_i64 CompressedPipe2::getRealTransferredBytes()
{
int64 encryption_overhead=0;
InternetServicePipe2* isp2 = dynamic_cast<InternetServicePipe2*>(getRealPipe());
if(isp2!=NULL)
{
encryption_overhead=isp2->getEncryptionOverheadBytes();
Server->Log("Encryption overhead: "+PrettyPrintBytes(encryption_overhead));
}
return getUncompressedSentBytes()+getUncompressedReceivedBytes()-encryption_overhead;
}