From e4128cd5d8aac19491cbe7d4b85462e217bfaa79 Mon Sep 17 00:00:00 2001 From: Olivier Goffart Date: Wed, 14 Aug 2013 19:59:16 +0200 Subject: [PATCH] progress with new propagator --- src/mirall/csyncthread.cpp | 101 ++++++++++++++++-------------- src/mirall/csyncthread.h | 8 +-- src/mirall/folder.cpp | 2 +- src/mirall/owncloudpropagator.cpp | 71 +++++++++++++++++---- src/mirall/owncloudpropagator.h | 15 +++++ src/mirall/progressdispatcher.h | 15 +++-- src/mirall/syncfileitem.h | 1 + 7 files changed, 143 insertions(+), 70 deletions(-) diff --git a/src/mirall/csyncthread.cpp b/src/mirall/csyncthread.cpp index 5e756f8121..099cc77619 100644 --- a/src/mirall/csyncthread.cpp +++ b/src/mirall/csyncthread.cpp @@ -194,6 +194,7 @@ int CSyncThread::treewalkFile( TREE_WALK_FILE *file, bool remote ) item._isDirectory = file->type == CSYNC_FTW_TYPE_DIR; item._modtime = file->modtime; item._etag = file->md5; + item._size = file->size; SyncFileItem::Direction dir; @@ -227,6 +228,9 @@ int CSyncThread::treewalkFile( TREE_WALK_FILE *file, bool remote ) dir = !remote ? SyncFileItem::Down : SyncFileItem::Up; break; case CSYNC_INSTRUCTION_CONFLICT: + _progressInfo.overall_file_count++; + _progressInfo.overall_transmission_size += file->size; + //fall trough case CSYNC_INSTRUCTION_IGNORE: case CSYNC_INSTRUCTION_ERROR: dir = SyncFileItem::None; @@ -234,6 +238,9 @@ int CSyncThread::treewalkFile( TREE_WALK_FILE *file, bool remote ) case CSYNC_INSTRUCTION_EVAL: case CSYNC_INSTRUCTION_NEW: case CSYNC_INSTRUCTION_SYNC: + _progressInfo.overall_file_count++; + _progressInfo.overall_transmission_size += file->size; + //fall trough case CSYNC_INSTRUCTION_STAT_ERROR: case CSYNC_INSTRUCTION_DELETED: case CSYNC_INSTRUCTION_UPDATED: @@ -272,6 +279,9 @@ int CSyncThread::treewalkFinalize(TREE_WALK_FILE* file) if (action != _performedActions.constEnd()) { if (file->instruction != CSYNC_INSTRUCTION_NONE) { // it is NONE if we are in the wrong tree (remote vs. local) + + qDebug() << "UPDATING " << file->path << action->instruction; + file->instruction = action->instruction; } @@ -325,22 +335,6 @@ void CSyncThread::startSync() // maybe move this somewhere else where it can influence a running sync? MirallConfigFile cfg; - int downloadLimit = 0; - if (cfg.useDownloadLimit()) { - downloadLimit = cfg.downloadLimit() * 1000; - } - csync_set_module_property(_csync_ctx, "bandwidth_limit_download", &downloadLimit); - - int uploadLimit = -75; // 75% - int useUpLimit = cfg.useUploadLimit(); - if ( useUpLimit >= 1) { - uploadLimit = cfg.uploadLimit() * 1000; - } else if (useUpLimit == 0) { - uploadLimit = 0; - } - csync_set_module_property(_csync_ctx, "bandwidth_limit_upload", &uploadLimit); - - csync_set_progress_callback( _csync_ctx, cb_progress ); csync_set_module_property(_csync_ctx, "csync_context", _csync_ctx); csync_set_userdata(_csync_ctx, this); @@ -383,6 +377,8 @@ void CSyncThread::startSync() return; } + _progressInfo = Progress::Info(); + _hasFiles = false; bool walkOk = true; if( csync_walk_local_tree(_csync_ctx, &treewalkLocal, 0) < 0 ) { @@ -423,13 +419,28 @@ void CSyncThread::startSync() _propagator.reset(new OwncloudPropagator (session, _localPath, _remotePath, &_progressDataBase)); connect(_propagator.data(), SIGNAL(completed(SyncFileItem, CSYNC_ERROR_CODE)), this, SLOT(transferCompleted(SyncFileItem, CSYNC_ERROR_CODE)), Qt::QueuedConnection); + connect(_propagator.data(), SIGNAL(progress(Progress::Kind,QString,quint64,quint64)), + this, SLOT(slotProgress(Progress::Kind,QString,quint64,quint64))); _iterator = 0; - startNextTransfer(); - // if( csync_propagate(_csync_ctx) < 0 ) { - // handleSyncError(_csync_ctx, "cysnc_reconcile"); - // return; - // } + int downloadLimit = 0; + if (cfg.useDownloadLimit()) { + downloadLimit = cfg.downloadLimit() * 1000; + } + _propagator->_downloadLimit = downloadLimit; + + int uploadLimit = -75; // 75% + int useUpLimit = cfg.useUploadLimit(); + if ( useUpLimit >= 1) { + uploadLimit = cfg.uploadLimit() * 1000; + } else if (useUpLimit == 0) { + uploadLimit = 0; + } + _propagator->_uploadLimit = uploadLimit; + + slotProgress(Progress::StartSync, QString(), 0, 0); + + startNextTransfer(); } void CSyncThread::transferCompleted(const SyncFileItem &item, CSYNC_ERROR_CODE error) @@ -447,7 +458,7 @@ void CSyncThread::transferCompleted(const SyncFileItem &item, CSYNC_ERROR_CODE e if (idx >= 0) { _syncedItems[idx]._instruction = CSYNC_INSTRUCTION_ERROR; _syncedItems[idx]._errorString = csyncErrorToString( error ); - _syncedItems[idx]._errorDetail = item._errorString; + _syncedItems[idx]._errorDetail = item._errorDetail; _syncedItems[idx]._httpCode = item._httpCode; qDebug() << "File " << item._file << " propagator error " << _syncedItems[idx]._errorString << "(" << item._errorString << ")"; @@ -474,13 +485,13 @@ void CSyncThread::transferCompleted(const SyncFileItem &item, CSYNC_ERROR_CODE e _performedActions.insert(item._renameTarget.toUtf8(), a); } - if (!item._isDirectory && a.instruction == CSYNC_INSTRUCTION_UPDATED - && item._dir == SyncFileItem::Down) { - emit fileReceived(item._file); + if (!item._isDirectory && a.instruction == CSYNC_INSTRUCTION_UPDATED) { + slotProgress((item._dir != SyncFileItem::Up) ? Progress::EndDownload : Progress::EndUpload, + item._file, item._size, item._size); + _progressInfo.current_file_no++; + _progressInfo.overall_current_bytes += item._size; } - //TODO progress %; - startNextTransfer(); } @@ -499,6 +510,13 @@ void CSyncThread::startNextTransfer() continue; } _propagator->_etag.clear(); // FIXME : set to the right one + + if (item._instruction == CSYNC_INSTRUCTION_SYNC || item._instruction == CSYNC_INSTRUCTION_NEW + || item._instruction == CSYNC_INSTRUCTION_CONFLICT) { + slotProgress((item._dir != SyncFileItem::Up) ? Progress::StartDownload : Progress::StartUpload, + item._file, 0, item._size); + } + _propagator->propagate(item); return; //propagate is async. } @@ -517,6 +535,7 @@ void CSyncThread::startNextTransfer() csync_commit(_csync_ctx); qDebug() << "CSync run took " << _syncTime.elapsed() << " Milliseconds"; + slotProgress(Progress::EndSync,QString(), 0 , 0); emit finished(); _propagator.reset(0); _syncMutex.unlock(); @@ -567,32 +586,20 @@ Progress::Kind CSyncThread::csyncToProgressKind( enum csync_notify_type_e kind ) return pKind; } -void CSyncThread::cb_progress( CSYNC_PROGRESS *progress, void *userdata ) +void CSyncThread::slotProgress(Progress::Kind kind, const QString &file, quint64 curr, quint64 total) { - if( !progress ) { - qDebug() << "No progress block in progress callback found!"; - return; - } - if( !userdata ) { - qDebug() << "No thread given in progress callback!"; - return; - } - Progress::Info pInfo; - CSyncThread *thread = static_cast(userdata); + Progress::Info pInfo = _progressInfo; - pInfo.kind = thread->csyncToProgressKind( progress->kind ); - pInfo.current_file = QUrl::fromEncoded( progress->path ).toString(); - pInfo.file_size = progress->file_size; - pInfo.current_file_bytes = progress->curr_bytes; + pInfo.kind = kind; + pInfo.current_file = file; + pInfo.file_size = total; + pInfo.current_file_bytes = curr; - pInfo.overall_file_count = progress->overall_file_count; - pInfo.current_file_no = progress->current_file_no; - pInfo.overall_transmission_size = progress->overall_transmission_size; - pInfo.overall_current_bytes = progress->current_overall_bytes; + pInfo.overall_current_bytes += curr; pInfo.timestamp = QDateTime::currentDateTime(); // Connect to something in folder! - thread->transmissionProgress( pInfo ); + transmissionProgress( pInfo ); } /* Given a path on the remote, give the path as it is when the rename is done */ diff --git a/src/mirall/csyncthread.h b/src/mirall/csyncthread.h index eb56517f3f..cfd7985953 100644 --- a/src/mirall/csyncthread.h +++ b/src/mirall/csyncthread.h @@ -60,8 +60,6 @@ public: Q_INVOKABLE void startSync(); signals: - void fileReceived( const QString& ); - void fileRemoved( const QString& ); void csyncError( const QString& ); void csyncWarning( const QString& ); void csyncUnavailable(); @@ -79,12 +77,11 @@ signals: private slots: void transferCompleted(const SyncFileItem& item, CSYNC_ERROR_CODE error); void startNextTransfer(); + void slotProgress(Progress::Kind kind, const QString& file, quint64, quint64); private: void handleSyncError(CSYNC *ctx, const char *state); - static void cb_progress( CSYNC_PROGRESS *progress, void *userdata ); - static int treewalkLocal( TREE_WALK_FILE*, void *); static int treewalkRemote( TREE_WALK_FILE*, void *); int treewalkFile( TREE_WALK_FILE*, bool ); @@ -116,6 +113,9 @@ private: QString adjustRenamedPath(const QString &original); bool _hasFiles; // true if there is at least one file that is not ignored or removed + Progress::Info _progressInfo; + int _downloadLimit; + int _uploadLimit; friend struct CSyncRunScopeHelper; }; diff --git a/src/mirall/folder.cpp b/src/mirall/folder.cpp index e789ae3747..4cad2670b7 100644 --- a/src/mirall/folder.cpp +++ b/src/mirall/folder.cpp @@ -488,7 +488,7 @@ void Folder::startSync(const QStringList &pathList) _thread = new QThread(this); _thread->setPriority(QThread::LowPriority); setIgnoredFiles(); - _csync = new CSyncThread( _csync_ctx, path(), QUrl(Folder::secondPath()).path()); + _csync = new CSyncThread( _csync_ctx, path(), QUrl(ownCloudInfo::instance()->webdavUrl() + secondPath()).path()); _csync->moveToThread(_thread); diff --git a/src/mirall/owncloudpropagator.cpp b/src/mirall/owncloudpropagator.cpp index c8d540d7bd..84f0c6ae23 100644 --- a/src/mirall/owncloudpropagator.cpp +++ b/src/mirall/owncloudpropagator.cpp @@ -235,11 +235,15 @@ csync_instructions_e OwncloudPropagator::uploadFile(const SyncFileItem &item) _progressDb->remove(item._file); } - //TODO - //ne_set_notifier(dav_session.ctx, ne_notify_status_cb, write_ctx); + ne_set_notifier(_session, notify_status_cb, this); + _lastTime.restart(); + _lastProgress = 0; + _chunked_done = 0; + _chunked_total_size = item._size; + _currentFile = item._file; if( state == HBF_SUCCESS ) { - //chunked_total_size = trans->stat_size; + _chunked_total_size = trans->stat_size; /* Transfer all the chunks through the HTTP session using PUT. */ state = hbf_transfer( _session, trans.data(), "PUT" ); } @@ -272,14 +276,10 @@ csync_instructions_e OwncloudPropagator::uploadFile(const SyncFileItem &item) } } while( !finished ); -// if (_progresscb) { -// ne_set_notifier(dav_session.ctx, 0, 0); -// _progresscb(write_ctx->url, rc != 0 ? CSYNC_NOTIFY_ERROR : -// CSYNC_NOTIFY_FINISHED_UPLOAD, error_code, -// (long long)(error_string), dav_session.userdata); -// } + ne_set_notifier(_session, 0, 0); updateMTimeAndETag(uri.data(), item._modtime); + return CSYNC_INSTRUCTION_UPDATED; } @@ -421,8 +421,6 @@ csync_instructions_e OwncloudPropagator::downloadFile(const SyncFileItem &item, /* actually do the request */ int retry = 0; -// ne_set_notifier(dav_session.ctx, ne_notify_status_cb, write_ctx); - QScopedPointer uri(ne_path_escape((_remoteDir + item._file).toUtf8())); DownloadContext writeCtx(&tmpFile); @@ -444,6 +442,11 @@ csync_instructions_e OwncloudPropagator::downloadFile(const SyncFileItem &item, * either the compressed- or uncompressed reader. */ ne_hook_post_headers( _session, DownloadContext::install_content_reader, &writeCtx); + ne_set_notifier(_session, notify_status_cb, this); + _lastProgress = 0; + _lastTime.start(); + _chunked_done = _chunked_total_size = 0; + _currentFile = item._file; int neon_stat = ne_request_dispatch(req.data()); @@ -453,7 +456,8 @@ csync_instructions_e OwncloudPropagator::downloadFile(const SyncFileItem &item, /* delete the hook again, otherwise they get chained as they are with the session */ ne_unhook_post_headers( _session, DownloadContext::install_content_reader, &writeCtx ); -// ne_set_notifier(_session, 0, 0); + ne_set_notifier(_session, 0, 0); + _chunked_done = _chunked_total_size = 0; if( updateErrorFromSession(neon_stat, req.data() ) ) { qDebug("Error GET: Neon: %d", neon_stat); @@ -648,6 +652,49 @@ bool OwncloudPropagator::updateErrorFromSession(int neon_code, ne_request *req) return re; } +void OwncloudPropagator::notify_status_cb(void* userdata, ne_session_status status, + const ne_session_status_info* info) +{ + OwncloudPropagator* this_ = reinterpret_cast(userdata); + + if ((status == ne_status_sending || status == ne_status_recving)) { + if (info->sr.total > 0) { + emit this_->progress(Progress::Context, this_->_currentFile, + this_->_chunked_done + info->sr.progress, + this_->_chunked_total_size ? this_->_chunked_total_size : info->sr.total ); + } + if (this_->_chunked_total_size && info->sr.total > 0 && info->sr.total == info->sr.progress) { + this_->_chunked_done += info->sr.total; + } + } + + /* throttle connection */ + int bandwidth_limit = 0; + if (status == ne_status_sending) bandwidth_limit = this_->_uploadLimit; + if (status == ne_status_recving) bandwidth_limit = this_->_downloadLimit; + if (bandwidth_limit > 0) { + int64_t diff = this_->_lastTime.nsecsElapsed() / 1000; + int64_t len = info->sr.progress - this_->_lastProgress; + if (len > 0 && diff > 0 && (1000000 * len / diff) > (int64_t)bandwidth_limit) { + int64_t wait_time = (1000000 * len / bandwidth_limit) - diff; + if (wait_time > 0) { + usleep(wait_time); + } + } + this_->_lastProgress = info->sr.progress; + this_->_lastTime.start(); + } else if (bandwidth_limit < 0 && bandwidth_limit > -100) { + int64_t diff = this_->_lastTime.nsecsElapsed() / 1000; + if (diff > 0) { + // -bandwidth_limit is the % of bandwidth + int64_t wait_time = -diff * (1 + 100.0 / bandwidth_limit); + if (wait_time > 0) { + usleep(wait_time); + } + } + this_->_lastTime.start(); + } +} } diff --git a/src/mirall/owncloudpropagator.h b/src/mirall/owncloudpropagator.h index 0881e708b1..ad51d9e503 100644 --- a/src/mirall/owncloudpropagator.h +++ b/src/mirall/owncloudpropagator.h @@ -18,8 +18,10 @@ #include #include #include +#include #include "syncfileitem.h" +#include "progressdispatcher.h" struct ne_session_s; struct ne_decompress_s; @@ -58,6 +60,15 @@ class OwncloudPropagator : public QObject { bool updateErrorFromSession(int neon_code = 0, ne_request *req = NULL); + QElapsedTimer _lastTime; + quint64 _lastProgress; + quint64 _chunked_total_size; + quint64 _chunked_done; + QString _currentFile; + + static void notify_status_cb (void *userdata, ne_session_status status, + const ne_session_status_info *info); + public: OwncloudPropagator(ne_session_s *session, const QString &localDir, const QString &remoteDir, ProgressDatabase *progressDb) @@ -76,8 +87,12 @@ public: QByteArray _etag; bool _hasFatalError; + int _downloadLimit; + int _uploadLimit; + signals: void completed(const SyncFileItem &, CSYNC_ERROR_CODE); + void progress(Progress::Kind, const QString &filename, quint64 bytes, quint64 total); }; diff --git a/src/mirall/progressdispatcher.h b/src/mirall/progressdispatcher.h index e508f4dc71..64bf01c06b 100644 --- a/src/mirall/progressdispatcher.h +++ b/src/mirall/progressdispatcher.h @@ -28,7 +28,7 @@ namespace Mirall { class Progress { public: - typedef enum { + enum Kind { Invalid, StartSync, Download, @@ -43,9 +43,9 @@ public: StartDelete, EndDelete, Error - } Kind; + }; - typedef struct { + struct Info { Kind kind; QString folder; QString current_file; @@ -59,15 +59,18 @@ public: QDateTime timestamp; - } Info; + Info() : kind(Invalid), file_size(0), current_file_bytes(0), + overall_file_count(0), current_file_no(0), + overall_transmission_size(0), overall_current_bytes(0) { } + }; - typedef struct { + struct SyncProblem { QString folder; QString current_file; QString error_message; int error_code; QDateTime timestamp; - } SyncProblem; + }; static QString asActionString( Kind ); static QString asResultString( Kind ); diff --git a/src/mirall/syncfileitem.h b/src/mirall/syncfileitem.h index fa92502529..3885cf3b28 100644 --- a/src/mirall/syncfileitem.h +++ b/src/mirall/syncfileitem.h @@ -60,6 +60,7 @@ public: bool _isDirectory; time_t _modtime; QByteArray _etag; + quint64 _size; QString _errorString; QString _errorDetail;