From f35b1f8a2bb9bf29a64df3d025cbe68b15324095 Mon Sep 17 00:00:00 2001 From: Olivier Goffart Date: Wed, 12 Feb 2014 11:07:34 +0100 Subject: [PATCH] Only starts 6 jobs in parallel --- src/mirall/csyncthread.cpp | 2 +- src/mirall/owncloudpropagator.cpp | 37 +++++++++++++++++++------- src/mirall/owncloudpropagator.h | 37 ++++++++++++++++++++++++-- src/mirall/owncloudpropagator_qnam.cpp | 5 +++- 4 files changed, 67 insertions(+), 14 deletions(-) diff --git a/src/mirall/csyncthread.cpp b/src/mirall/csyncthread.cpp index 01b0bf521e..ffd4fddcc8 100644 --- a/src/mirall/csyncthread.cpp +++ b/src/mirall/csyncthread.cpp @@ -709,7 +709,7 @@ QString CSyncThread::adjustRenamedPath(const QString& original) void CSyncThread::abort() { csync_request_abort(_csync_ctx); - if(_propagator); + if(_propagator) _propagator->abort(); } diff --git a/src/mirall/owncloudpropagator.cpp b/src/mirall/owncloudpropagator.cpp index 54fdcaf49c..881239698f 100644 --- a/src/mirall/owncloudpropagator.cpp +++ b/src/mirall/owncloudpropagator.cpp @@ -49,6 +49,9 @@ #include +/* The maximum number of active job in parallel */ +static const int maximumActiveJob = 6; + // We use some internals of csync: extern "C" int c_utimes(const char *, const struct timeval *); extern "C" void csync_win32_set_file_hidden( const char *file, bool h ); @@ -1110,7 +1113,7 @@ void PropagateDirectory::start() _current = -1; _hasError = SyncFileItem::NoStatus; if (!_firstJob) { - slotSubJobFinished(SyncFileItem::Success); + slotSubJobReady(); } else { startJob(_firstJob.data()); } @@ -1125,18 +1128,32 @@ void PropagateDirectory::slotSubJobFinished(SyncFileItem::Status status) } else if (status == SyncFileItem::NormalError || status == SyncFileItem::SoftError) { _hasError = status; } + _runningNow--; + slotSubJobReady(); +} - if (_current == -1) { - // Start all the jobs - foreach( PropagatorJob *next , _subJobs ) { - startJob(next); - } +void PropagateDirectory::slotSubJobReady() +{ + qDebug() << Q_FUNC_INFO << _runningNow << _propagator->_activeJobs; + + if (_runningNow && _current == -1) + return; // Ignore the case when the _fistJob is ready and not yet finished + if (_runningNow && _current >= 0 && _current < _subJobs.count()) { + // there is a job running and the current one is not ready yet, we can't start new job + qDebug() << _subJobs[_current]->_readySent << maximumActiveJob << _subJobs[_current]; + if (!_subJobs[_current]->_readySent || _propagator->_activeJobs >= maximumActiveJob) + return; } - _current ++; - if (_current >= _subJobs.size()) { - // We finished to process all the jobs - + _current++; + if (_current < _subJobs.size() && !_propagator->_abortRequested.fetchAndAddRelaxed(0)) { + PropagatorJob *next = _subJobs.at(_current); + startJob(next); + return; + } + // We finished to processing all the jobs + emitReady(); + if (!_runningNow) { if (!_item.isEmpty() && _hasError == SyncFileItem::NoStatus) { if( !_item._renameTarget.isEmpty() ) { _item._file = _item._renameTarget; diff --git a/src/mirall/owncloudpropagator.h b/src/mirall/owncloudpropagator.h index 7c2d0d8f46..5ae00af3e7 100644 --- a/src/mirall/owncloudpropagator.h +++ b/src/mirall/owncloudpropagator.h @@ -36,16 +36,39 @@ class PropagatorJob : public QObject { Q_OBJECT protected: OwncloudPropagator *_propagator; + void emitReady() { + bool wasReady = _readySent; + _readySent = true; + if (!wasReady) + emit ready(); + }; public: - explicit PropagatorJob(OwncloudPropagator* propagator) : _propagator(propagator) {} + bool _readySent; + explicit PropagatorJob(OwncloudPropagator* propagator) : _propagator(propagator), _readySent(false) {} public slots: virtual void start() = 0; virtual void abort() {} signals: + /** + * Emitted when the job is fully finished + */ void finished(SyncFileItem::Status); + + /** + * Emitted when one item has been completed within a job. + */ void completed(const SyncFileItem &); + + /** + * Emitted when all the sub-jobs have been scheduled and + * we are ready and more jobs might be started + * This signal is not always emitted. + */ + void ready(); + void progress(Progress::Kind, const SyncFileItem& item, quint64 bytes, quint64 total); + }; /* @@ -64,12 +87,13 @@ public: SyncFileItem _item; int _current; // index of the current running job + int _runningNow; // number of subJob running now SyncFileItem::Status _hasError; // NoStatus, or NormalError / SoftError if there was an error explicit PropagateDirectory(OwncloudPropagator *propagator, const SyncFileItem &item = SyncFileItem()) : PropagatorJob(propagator) - , _firstJob(0), _item(item), _current(-1), _hasError(SyncFileItem::NoStatus) { } + , _firstJob(0), _item(item), _current(-1), _runningNow(0), _hasError(SyncFileItem::NoStatus) { } virtual ~PropagateDirectory() { qDeleteAll(_subJobs); @@ -92,10 +116,13 @@ private slots: connect(next, SIGNAL(finished(SyncFileItem::Status)), this, SLOT(slotSubJobFinished(SyncFileItem::Status)), Qt::QueuedConnection); connect(next, SIGNAL(completed(SyncFileItem)), this, SIGNAL(completed(SyncFileItem))); connect(next, SIGNAL(progress(Progress::Kind,SyncFileItem,quint64,quint64)), this, SIGNAL(progress(Progress::Kind,SyncFileItem,quint64,quint64))); + connect(next, SIGNAL(ready()), this, SLOT(slotSubJobReady())); + _runningNow++; QMetaObject::invokeMethod(next, "start"); } void slotSubJobFinished(SyncFileItem::Status status); + void slotSubJobReady(); }; @@ -152,6 +179,7 @@ public: , _localDir((localDir.endsWith(QChar('/'))) ? localDir : localDir+'/' ) , _remoteDir((remoteDir.endsWith(QChar('/'))) ? remoteDir : remoteDir+'/' ) , _journal(progressDb) + , _activeJobs(0) { } void start(const SyncFileItemVector &_syncedItems); @@ -161,9 +189,14 @@ public: QAtomicInt _abortRequested; // boolean set by the main thread to abort. + /* The number of currently active jobs */ + int _activeJobs; + void overallTransmissionSizeChanged( qint64 change ); bool isInSharedDirectory(const QString& file); + + void abort() { _abortRequested.fetchAndStoreOrdered(true); if (_rootJob) diff --git a/src/mirall/owncloudpropagator_qnam.cpp b/src/mirall/owncloudpropagator_qnam.cpp index 1603d76805..0ba8d852d5 100644 --- a/src/mirall/owncloudpropagator_qnam.cpp +++ b/src/mirall/owncloudpropagator_qnam.cpp @@ -23,7 +23,6 @@ namespace Mirall { void PUTFileJob::start() { QNetworkRequest req; - qDebug() << _headers; for(QMap::const_iterator it = _headers.begin(); it != _headers.end(); ++it) { req.setRawHeader(it.key(), it.value()); qDebug() << it.key() << it.value(); @@ -217,10 +216,14 @@ void PropagateUploadFileQNAM::start() emit progress(Progress::StartUpload, _item, 0, file->size()); job->start(); + + _propagator->_activeJobs++; + emitReady(); } void PropagateUploadFileQNAM::slotPutFinished() { + _propagator->_activeJobs--; PUTFileJob *job = qobject_cast(sender()); Q_ASSERT(job);