#include "FileClientChunked.h" #include "../../urbackupcommon/fileclient/data.h" #include "../../Interface/Server.h" #include "../../Interface/File.h" #include "../../stringtools.h" extern std::string server_identity; const unsigned int chunkhash_file_off=sizeof(_i64); const unsigned int chunkhash_single_size=big_hash_size+small_hash_size*(c_checkpoint_dist/c_small_hash_dist); unsigned int adler32(unsigned int adler, const char *buf, unsigned int len); FileClientChunked::FileClientChunked(IPipe *pipe, CTCPStack *stack) : pipe(pipe), stack(stack), destroy_pipe(false) { has_error=false; } FileClientChunked::FileClientChunked(void) : pipe(NULL), stack(NULL), destroy_pipe(false) { has_error=true; } FileClientChunked::~FileClientChunked(void) { if(pipe!=NULL && destroy_pipe) { Server->destroy(pipe); pipe=NULL; } } _u32 FileClientChunked::GetFilePatch(std::string remotefn, IFile *orig_file, IFile *patchfile, IFile *chunkhashes, IFile *hashoutput) { m_file=NULL; patch_mode=true; m_chunkhashes=chunkhashes; m_hashoutput=hashoutput; m_patchfile=patchfile; m_file=orig_file; patchfile_pos=0; patch_buf_pos=0; return GetFile(remotefn); } _u32 FileClientChunked::GetFileChunked(std::string remotefn, IFile *file, IFile *chunkhashes, IFile *hashoutput) { patch_mode=false; m_file=file; m_chunkhashes=chunkhashes; m_hashoutput=hashoutput; return GetFile(remotefn); } _u32 FileClientChunked::GetFile(std::string remotefn) { getfile_done=false; retval=ERR_SUCCESS; if(pipe==NULL) return ERR_ERROR; int tries=50; _i64 fileoffset=0; m_chunkhashes->Seek(0); _i64 hashfilesize=0; if(m_chunkhashes->Read((char*)&hashfilesize, sizeof(_i64))!=sizeof(_i64) ) return ERR_INT_ERROR; { CWData data; data.addUChar( ID_GET_FILE_BLOCKDIFF ); data.addString( remotefn ); data.addString( server_identity ); data.addInt64( fileoffset ); data.addInt64( hashfilesize ); stack->Send( pipe, data.getDataPtr(), data.getDataSize() ); } next_chunk=0; num_chunks=hashfilesize/c_checkpoint_dist+((hashfilesize%c_checkpoint_dist!=0)?1:0); remote_filesize=-1; num_total_chunks=0; starttime=Server->getTimeMS(); queued_chunks=0; block_for_chunk_start=-1; char buf[BUFFERSIZE]; state=CS_ID_FIRST; do { if(queued_chunksSeek(chunkhash_file_off+next_chunk*chunkhash_single_size); char buf[chunkhash_single_size+2*sizeof(char)+sizeof(_i64)]; buf[0]=ID_BLOCK_REQUEST; *((_i64*)(buf+1))=next_chunk*c_checkpoint_dist; buf[1+sizeof(_i64)]=0; m_chunkhashes->Read(&buf[2*sizeof(char)+sizeof(_i64)], chunkhash_single_size); stack->Send( pipe, buf, chunkhash_single_size+2*sizeof(char)+sizeof(_i64)); char *sptr=&buf[2*sizeof(char)+sizeof(_i64)]; SChunkHashes chhash; memcpy(chhash.big_hash, sptr, big_hash_size); memcpy(chhash.small_hash, sptr+big_hash_size, chunkhash_single_size-big_hash_size); pending_chunks.insert(std::pair<_i64, SChunkHashes>(next_chunk*c_checkpoint_dist, chhash)); } else { CWData data; data.addUChar(ID_BLOCK_REQUEST); data.addInt64(next_chunk*c_checkpoint_dist); data.addChar(1); stack->Send( pipe, data.getDataPtr(), data.getDataSize()); pending_chunks.insert(std::pair<_i64, SChunkHashes>(next_chunk*c_checkpoint_dist, SChunkHashes() )); } ++queued_chunks; ++next_chunk; } } else { if(queued_chunks>0 || remote_filesize==-1) { pipe->isReadable(100); } else { return ERR_SUCCESS; } } size_t rc=pipe->Read(buf, BUFFERSIZE, 0); if(rc==0) { if(pipe->hasError()) { return ERR_CONN_LOST; } } else { starttime=Server->getTimeMS(); bufptr=buf; remaining_bufptr_bytes=rc; while(bufptrgetTimeMS()-starttimeLog("FileClientChunked: Whole block start="+nconvert(block_start), LL_DEBUG); if(pending_chunks.find(block_start)==pending_chunks.end()) { Server->Log("Block not requested.", LL_ERROR); retval=ERR_ERROR; getfile_done=true; return; } file_pos=block_start; if(!m_file->Seek(block_start)) Server->Log("Chunked Transfer: Seeking failed", LL_ERROR); block_for_chunk_start=block_start; msg.getUInt(&whole_block_remaining); state=CS_BLOCK; md5_hash.init(); hash_for_whole_block=false; adler_hash=adler32(0, NULL, 0); adler_remaining=c_chunk_size; block_pos=0; m_hashoutput->Seek(chunkhash_file_off+(block_start/c_checkpoint_dist)*chunkhash_single_size); char tmp[big_hash_size]={}; writeFileRepeat(m_hashoutput, tmp, big_hash_size); }break; case ID_UPDATE_CHUNK: { msg.getInt64(&chunk_start); Hash_upto(chunk_start); msg.getUInt(&adler_remaining); Server->Log("FileClientChunked: Chunk start="+nconvert(chunk_start)+" remaining="+nconvert(adler_remaining), LL_DEBUG); file_pos=chunk_start; _i64 block=chunk_start/c_checkpoint_dist; if(pending_chunks.find(block*c_checkpoint_dist)==pending_chunks.end()) { Server->Log("Chunk not requested.", LL_ERROR); retval=ERR_ERROR; getfile_done=true; return; } m_file->Seek(chunk_start); unsigned int chunknum=(chunk_start%c_checkpoint_dist)/c_chunk_size; m_hashoutput->Seek(chunkhash_file_off+block*chunkhash_single_size +big_hash_size+chunknum*small_hash_size); state=CS_CHUNK; adler_hash=adler32(0, NULL, 0); }break; case ID_NO_CHANGE: { _i64 block_start; msg.getInt64(&block_start); Hash_nochange(block_start); state=CS_ID_FIRST; }break; case ID_BLOCK_HASH: { _i64 block_start; msg.getInt64(&block_start); const char *blockhash=msg.getCurrDataPtr(); Hash_finalize(block_start, blockhash); }break; } } else { if(remaining_bufptr_bytes>0) { memcpy(&packet_buf[packet_buf_off], bufptr, remaining_bufptr_bytes); packet_buf_off+=remaining_bufptr_bytes; need_bytes-=(unsigned int)remaining_bufptr_bytes; } state=CS_ID_ACC; bufptr_bytes_done+=remaining_bufptr_bytes; remaining_bufptr_bytes=0; } } void FileClientChunked::Hash_upto(_i64 chunk_start) { _i64 block_start=(chunk_start/c_checkpoint_dist)*c_checkpoint_dist; if(block_start!=block_for_chunk_start) { block_for_chunk_start=block_start; md5_hash.init(); last_chunk_patches.clear(); patch_buf_pos=0; hash_for_whole_block=false; if(block_start!=chunk_start) { m_file->Seek(block_start); char buf2[BUFFERSIZE]; do { size_t r=m_file->Read(buf2, BUFFERSIZE); if(rLog("Read error in File chunked - 1", LL_ERROR); } block_start+=r; md5_hash.update((unsigned char*)buf2, (unsigned int)r); }while(block_startremote_filesize) dest_pos=remote_filesize; char buf2[BUFFERSIZE]; m_file->Seek(chunk_start); while(chunk_startRead(buf2, BUFFERSIZE); file_pos+=r; chunk_start+=r; md5_hash.update((unsigned char*)buf2, (unsigned int)r); } } block_for_chunk_start=-1; md5_hash.finalize(); } if(memcmp(hash_from_client, md5_hash.raw_digest_int(), big_hash_size)!=0) { if(!hash_for_whole_block) { invalidateLastPatches(); CWData data; data.addUChar(ID_BLOCK_REQUEST); data.addInt64(curr_pos); data.addChar(1); stack->Send( pipe, data.getDataPtr(), data.getDataSize()); } else { retval=ERR_HASH; getfile_done=true; } } else { m_hashoutput->Seek(chunkhash_file_off+(curr_pos/c_checkpoint_dist)*chunkhash_single_size); writeFileRepeat(m_hashoutput, hash_from_client, big_hash_size); std::map<_i64, SChunkHashes>::iterator it=pending_chunks.find(curr_pos); if(it!=pending_chunks.end()) { pending_chunks.erase(it); --queued_chunks; } else { Server->Log("Pending chunk not found -1", LL_ERROR); } } } void FileClientChunked::Hash_nochange(_i64 curr_pos) { std::map<_i64, SChunkHashes>::iterator it=pending_chunks.find(curr_pos); if(it!=pending_chunks.end()) { m_hashoutput->Seek(chunkhash_file_off+(curr_pos/c_checkpoint_dist)*chunkhash_single_size); m_hashoutput->Write(it->second.big_hash, chunkhash_single_size); pending_chunks.erase(it); --queued_chunks; } else { Server->Log("Pending chunk not found -1", LL_ERROR); retval=ERR_ERROR; getfile_done=true; } } void FileClientChunked::State_Block(void) { size_t rbytes=(std::min)(remaining_bufptr_bytes, (size_t)whole_block_remaining); remaining_bufptr_bytes-=rbytes; bufptr_bytes_done+=rbytes; whole_block_remaining-=(unsigned int)rbytes; md5_hash.update((unsigned char*)bufptr, (unsigned int)rbytes); if(!patch_mode) { writeFileRepeat(m_file, bufptr, rbytes); file_pos+=rbytes; } else { writePatch(file_pos, (unsigned int)rbytes, bufptr, false, whole_block_remaining==0); file_pos+=rbytes; } chunk_start+=(unsigned int)rbytes; char *alder_bufptr=bufptr; while(rbytes>0) { size_t adler_bytes=(std::min)((size_t)adler_remaining, rbytes); adler_hash=adler32(adler_hash, alder_bufptr, (unsigned int)adler_bytes); alder_bufptr+=adler_bytes; rbytes-=adler_bytes; adler_remaining-=(unsigned int)adler_bytes; if(adler_remaining==0 || whole_block_remaining==0) { writeFileRepeat(m_hashoutput, (char*)&adler_hash, small_hash_size); adler_hash=adler32(0, NULL, 0); adler_remaining=c_chunk_size; } block_pos+=(unsigned int)adler_bytes; } if(whole_block_remaining==0) { md5_hash.finalize(); hash_for_whole_block=true; m_hashoutput->Seek(chunkhash_file_off+(block_for_chunk_start/c_checkpoint_dist)*chunkhash_single_size); writeFileRepeat(m_hashoutput, (char*)md5_hash.raw_digest_int(), big_hash_size); state=CS_ID_FIRST; } } void FileClientChunked::writeFileRepeat(IFile *f, const char *buf, size_t bsize) { _u32 written=0; _u32 rc; int tries=50; do { rc=f->Write(buf+written, (_u32)(bsize-written)); written+=rc; if(rc==0) { Server->Log("Failed to write to file... waiting... in Chunked File transfer", LL_WARNING); Server->wait(10000); --tries; } } while(written0 || tries>0) ); if(rc==0) { Server->Log("Fatal error writing to file in writeFileRepeat. Write error in Chunked File transfer.", LL_ERROR); } } void FileClientChunked::State_Chunk(void) { size_t rbytes=(std::min)(remaining_bufptr_bytes, (size_t)adler_remaining); adler_remaining-=(unsigned int)rbytes; chunk_start+=rbytes; adler_hash=adler32(adler_hash, bufptr, (unsigned int)rbytes); md5_hash.update((unsigned char*)bufptr, (unsigned int)rbytes); if(!patch_mode) { writeFileRepeat(m_file, bufptr, rbytes); file_pos+=rbytes; } else { writePatch(file_pos, (unsigned int)rbytes, bufptr, true, adler_remaining==0); file_pos+=rbytes; } remaining_bufptr_bytes-=rbytes; bufptr_bytes_done+=rbytes; if(adler_remaining==0) { writeFileRepeat(m_hashoutput, (char*)&adler_hash, small_hash_size); state=CS_ID_FIRST; } } _i64 FileClientChunked::getSize(void) { return remote_filesize; } void FileClientChunked::writePatch(_i64 pos, unsigned int length, char *buf, bool in_chunk, bool last) { if(length<=c_chunk_size-patch_buf_pos && (patch_buf_pos==0 || pos==patch_buf_start+patch_buf_pos) ) { if(buf!=NULL) { memcpy(&patch_buf[patch_buf_pos], buf, length); } if(patch_buf_pos==0) { patch_buf_start=pos; patch_in_chunk=in_chunk; } patch_buf_pos+=length; if(last || patch_buf_pos==c_chunk_size || length==0) { writePatchInt(patch_buf_start, patch_buf_pos, patch_buf, patch_in_chunk); patch_buf_start=0; } } else { if(patch_buf_start!=0) { writePatchInt(patch_buf_start, patch_buf_pos, patch_buf, patch_in_chunk); patch_buf_start=0; } writePatchInt(pos, length, buf, in_chunk); } } void FileClientChunked::writePatchInt(_i64 pos, unsigned int length, char *buf, bool in_chunk) { const unsigned int plen=sizeof(_i64)+sizeof(unsigned int); char pd[plen]; memcpy(pd, &pos, sizeof(_i64)); memcpy(pd+sizeof(_i64), &length, sizeof(unsigned int)); writeFileRepeat(m_patchfile, pd, plen); writeFileRepeat(m_patchfile, buf, length); if(in_chunk) { last_chunk_patches.push_back(patchfile_pos); } patchfile_pos+=plen+length; } void FileClientChunked::writePatchSize(_i64 remote_fs) { m_patchfile->Seek(0); writeFileRepeat(m_patchfile, (char*)&remote_fs, sizeof(_i64)); patchfile_pos=sizeof(_i64); } bool FileClientChunked::hasError(void) { return has_error; } void FileClientChunked::invalidateLastPatches(void) { _i64 invalid_pos=-1; for(size_t i=0;iSeek(last_chunk_patches[i]); m_patchfile->Write((char*)&invalid_pos, sizeof(_i64)); } m_patchfile->Seek(patchfile_pos); patch_buf_pos=0; } void FileClientChunked::setDestroyPipe(bool b) { destroy_pipe=b; } size_t FileClientChunked::getTransferredBytes(void) { transferred_bytes+=pipe->getTransferedBytes(); pipe->resetTransferedBytes(); return transferred_bytes; }