diff --git a/src/libsync/owncloudpropagator.cpp b/src/libsync/owncloudpropagator.cpp index 281f34548e..ed473be211 100644 --- a/src/libsync/owncloudpropagator.cpp +++ b/src/libsync/owncloudpropagator.cpp @@ -742,6 +742,16 @@ PropagatorJob::JobParallelism PropagatorCompositeJob::parallelism() return FullParallelism; } +void PropagatorCompositeJob::slotSubJobAbortFinished() +{ + // Count that job has been finished + _abortsCount--; + + // Emit abort if last job has been aborted + if (_abortsCount == 0) { + emit abortFinished(); + } +} bool PropagatorCompositeJob::scheduleSelfOrChild() { @@ -900,7 +910,8 @@ void PropagateDirectory::slotFirstJobFinished(SyncFileItem::Status status) if (status != SyncFileItem::Success && status != SyncFileItem::Restoration) { if (_state != Finished) { - abort(); + // Synchronously abort + abort(AbortType::Synchronous); _state = Finished; emit finished(status); } diff --git a/src/libsync/owncloudpropagator.h b/src/libsync/owncloudpropagator.h index 605614902f..402226bdc9 100644 --- a/src/libsync/owncloudpropagator.h +++ b/src/libsync/owncloudpropagator.h @@ -65,6 +65,11 @@ class PropagatorJob : public QObject public: explicit PropagatorJob(OwncloudPropagator *propagator); + enum AbortType { + Synchronous, + Asynchronous + }; + enum JobState { NotYetStarted, Running, @@ -98,7 +103,14 @@ public: virtual qint64 committedDiskSpace() const { return 0; } public slots: - virtual void abort() {} + /* + * Asynchronous abort requires emit of abortFinished() signal, + * while synchronous is expected to abort immedietaly. + */ + virtual void abort(PropagatorJob::AbortType abortType) { + if (abortType == AbortType::Asynchronous) + emit abortFinished(); + } /** Starts this job, or a new subjob * returns true if a job was started. @@ -110,11 +122,14 @@ signals: */ void finished(SyncFileItem::Status); + /** + * Emitted when the abort is fully finished + */ + void abortFinished(SyncFileItem::Status status = SyncFileItem::NormalError); protected: OwncloudPropagator *propagator() const; }; - /* * Abstract class to propagate a single item */ @@ -185,10 +200,11 @@ public: SyncFileItemVector _tasksToDo; QVector _runningJobs; SyncFileItem::Status _hasError; // NoStatus, or NormalError / SoftError if there was an error + quint64 _abortsCount; explicit PropagatorCompositeJob(OwncloudPropagator *propagator) : PropagatorJob(propagator) - , _hasError(SyncFileItem::NoStatus) + , _hasError(SyncFileItem::NoStatus), _abortsCount(0) { } @@ -209,15 +225,32 @@ public: virtual bool scheduleSelfOrChild() Q_DECL_OVERRIDE; virtual JobParallelism parallelism() Q_DECL_OVERRIDE; - virtual void abort() Q_DECL_OVERRIDE + + /* + * Abort synchronously or asynchronously - some jobs + * require to be finished without immediete abort (abort on job might + * cause conflicts/duplicated files - owncloud/client/issues/5949) + */ + virtual void abort(PropagatorJob::AbortType abortType) Q_DECL_OVERRIDE { - foreach (PropagatorJob *j, _runningJobs) - j->abort(); + if (!_runningJobs.empty()) { + _abortsCount = _runningJobs.size(); + foreach (PropagatorJob *j, _runningJobs) { + if (abortType == AbortType::Asynchronous) { + connect(j, &PropagatorJob::abortFinished, + this, &PropagatorCompositeJob::slotSubJobAbortFinished); + } + j->abort(abortType); + } + } else if (abortType == AbortType::Asynchronous){ + emit abortFinished(); + } } qint64 committedDiskSpace() const Q_DECL_OVERRIDE; private slots: + void slotSubJobAbortFinished(); bool possiblyRunNextJob(PropagatorJob *next) { if (next->_state == NotYetStarted) { @@ -258,11 +291,17 @@ public: virtual bool scheduleSelfOrChild() Q_DECL_OVERRIDE; virtual JobParallelism parallelism() Q_DECL_OVERRIDE; - virtual void abort() Q_DECL_OVERRIDE + virtual void abort(PropagatorJob::AbortType abortType) Q_DECL_OVERRIDE { if (_firstJob) - _firstJob->abort(); - _subJobs.abort(); + // Force first job to abort synchronously + // even if caller allows async abort (asyncAbort) + _firstJob->abort(AbortType::Synchronous); + + if (abortType == AbortType::Asynchronous){ + connect(&_subJobs, &PropagatorCompositeJob::abortFinished, this, &PropagateDirectory::abortFinished); + } + _subJobs.abort(abortType); } void increaseAffectedCount() @@ -280,6 +319,7 @@ private slots: void slotFirstJobFinished(SyncFileItem::Status status); void slotSubJobsFinished(SyncFileItem::Status status); + }; @@ -324,6 +364,7 @@ public: , _chunkSize(10 * 1000 * 1000) // 10 MB, overridden in setSyncOptions , _account(account) { + qRegisterMetaType("PropagatorJob::AbortType"); } ~OwncloudPropagator(); @@ -406,11 +447,19 @@ public: { _abortRequested.fetchAndStoreOrdered(true); if (_rootJob) { - // We're possibly already in an item's finished stack - QMetaObject::invokeMethod(_rootJob.data(), "abort", Qt::QueuedConnection); + // Connect to abortFinished which signals that abort has been asynchronously finished + connect(_rootJob.data(), &PropagateDirectory::abortFinished, this, &OwncloudPropagator::emitFinished); + + // Use Queued Connection because we're possibly already in an item's finished stack + QMetaObject::invokeMethod(_rootJob.data(), "abort", Qt::QueuedConnection, + Q_ARG(PropagatorJob::AbortType, PropagatorJob::AbortType::Asynchronous)); + + // Give asynchronous abort 5000 msec to finish on its own + QTimer::singleShot(5000, this, SLOT(abortTimeout())); + } else { + // No root job, call emitFinished + emitFinished(SyncFileItem::NormalError); } - // abort() of all jobs will likely have already resulted in finished being emitted, but just in case. - QMetaObject::invokeMethod(this, "emitFinished", Qt::QueuedConnection, Q_ARG(SyncFileItem::Status, SyncFileItem::NormalError)); } // timeout in seconds @@ -431,6 +480,13 @@ public: private slots: + void abortTimeout() + { + // Abort synchronously and finish + _rootJob.data()->abort(PropagatorJob::AbortType::Synchronous); + emitFinished(SyncFileItem::NormalError); + } + /** Emit the finished signal and make sure it is only emitted once */ void emitFinished(SyncFileItem::Status status) { diff --git a/src/libsync/propagatedownload.cpp b/src/libsync/propagatedownload.cpp index 72b311a3fd..9be1b82451 100644 --- a/src/libsync/propagatedownload.cpp +++ b/src/libsync/propagatedownload.cpp @@ -900,9 +900,13 @@ void PropagateDownloadFile::slotDownloadProgress(qint64 received, qint64) } -void PropagateDownloadFile::abort() +void PropagateDownloadFile::abort(PropagatorJob::AbortType abortType) { if (_job && _job->reply()) _job->reply()->abort(); + + if (abortType == AbortType::Asynchronous) { + emit abortFinished(); + } } } diff --git a/src/libsync/propagatedownload.h b/src/libsync/propagatedownload.h index 26361541c2..80998f67f6 100644 --- a/src/libsync/propagatedownload.h +++ b/src/libsync/propagatedownload.h @@ -185,7 +185,7 @@ private slots: /// Called when it's time to update the db metadata void updateMetadata(bool isConflict); - void abort() Q_DECL_OVERRIDE; + void abort(PropagatorJob::AbortType abortType) Q_DECL_OVERRIDE; void slotDownloadProgress(qint64, qint64); void slotChecksumFail(const QString &errMsg); diff --git a/src/libsync/propagateremotedelete.cpp b/src/libsync/propagateremotedelete.cpp index 2e7d03b221..f67b2b3afb 100644 --- a/src/libsync/propagateremotedelete.cpp +++ b/src/libsync/propagateremotedelete.cpp @@ -75,10 +75,14 @@ void PropagateRemoteDelete::start() _job->start(); } -void PropagateRemoteDelete::abort() +void PropagateRemoteDelete::abort(PropagatorJob::AbortType abortType) { if (_job && _job->reply()) _job->reply()->abort(); + + if (abortType == AbortType::Asynchronous) { + emit abortFinished(); + } } void PropagateRemoteDelete::slotDeleteJobFinished() diff --git a/src/libsync/propagateremotedelete.h b/src/libsync/propagateremotedelete.h index 6246f7cbf0..4d07843e42 100644 --- a/src/libsync/propagateremotedelete.h +++ b/src/libsync/propagateremotedelete.h @@ -52,7 +52,7 @@ public: { } void start() Q_DECL_OVERRIDE; - void abort() Q_DECL_OVERRIDE; + void abort(PropagatorJob::AbortType abortType) Q_DECL_OVERRIDE; bool isLikelyFinishedQuickly() Q_DECL_OVERRIDE { return !_item->isDirectory(); } diff --git a/src/libsync/propagateremotemkdir.cpp b/src/libsync/propagateremotemkdir.cpp index fb0869a2b3..98a72d48aa 100644 --- a/src/libsync/propagateremotemkdir.cpp +++ b/src/libsync/propagateremotemkdir.cpp @@ -60,10 +60,14 @@ void PropagateRemoteMkdir::slotStartMkcolJob() _job->start(); } -void PropagateRemoteMkdir::abort() +void PropagateRemoteMkdir::abort(PropagatorJob::AbortType abortType) { if (_job && _job->reply()) _job->reply()->abort(); + + if (abortType == AbortType::Asynchronous) { + emit abortFinished(); + } } void PropagateRemoteMkdir::setDeleteExisting(bool enabled) diff --git a/src/libsync/propagateremotemkdir.h b/src/libsync/propagateremotemkdir.h index a89fbe99e3..8a08efabd1 100644 --- a/src/libsync/propagateremotemkdir.h +++ b/src/libsync/propagateremotemkdir.h @@ -35,7 +35,7 @@ public: { } void start() Q_DECL_OVERRIDE; - void abort() Q_DECL_OVERRIDE; + void abort(PropagatorJob::AbortType abortType) Q_DECL_OVERRIDE; // Creating a directory should be fast. bool isLikelyFinishedQuickly() Q_DECL_OVERRIDE { return true; } diff --git a/src/libsync/propagateremotemove.cpp b/src/libsync/propagateremotemove.cpp index 37ca9fecd0..35a2486cf9 100644 --- a/src/libsync/propagateremotemove.cpp +++ b/src/libsync/propagateremotemove.cpp @@ -117,10 +117,14 @@ void PropagateRemoteMove::start() _job->start(); } -void PropagateRemoteMove::abort() +void PropagateRemoteMove::abort(PropagatorJob::AbortType abortType) { if (_job && _job->reply()) _job->reply()->abort(); + + if (abortType == AbortType::Asynchronous) { + emit abortFinished(); + } } void PropagateRemoteMove::slotMoveJobFinished() diff --git a/src/libsync/propagateremotemove.h b/src/libsync/propagateremotemove.h index bf6661a656..1a1b8ce921 100644 --- a/src/libsync/propagateremotemove.h +++ b/src/libsync/propagateremotemove.h @@ -56,7 +56,7 @@ public: { } void start() Q_DECL_OVERRIDE; - void abort() Q_DECL_OVERRIDE; + void abort(PropagatorJob::AbortType abortType) Q_DECL_OVERRIDE; JobParallelism parallelism() Q_DECL_OVERRIDE { return _item->isDirectory() ? WaitForFinished : FullParallelism; } /** diff --git a/src/libsync/propagateupload.cpp b/src/libsync/propagateupload.cpp index eca0426789..6de6316af5 100644 --- a/src/libsync/propagateupload.cpp +++ b/src/libsync/propagateupload.cpp @@ -558,20 +558,24 @@ void PropagateUploadFileCommon::slotJobDestroyed(QObject *job) _jobs.erase(std::remove(_jobs.begin(), _jobs.end(), job), _jobs.end()); } -void PropagateUploadFileCommon::abort() +void PropagateUploadFileCommon::abort(PropagatorJob::AbortType abortType) { foreach (auto *job, _jobs) { if (job->reply()) { job->reply()->abort(); } } + + if (abortType == AbortType::Asynchronous) { + emit abortFinished(); + } } // This function is used whenever there is an error occuring and jobs might be in progress void PropagateUploadFileCommon::abortWithError(SyncFileItem::Status status, const QString &error) { _finished = true; - abort(); + abort(AbortType::Synchronous); done(status, error); } @@ -625,4 +629,33 @@ void PropagateUploadFileCommon::finalize() done(SyncFileItem::Success); } + +void PropagateUploadFileCommon::prepareAbort(PropagatorJob::AbortType abortType) { + if (!_jobs.empty()) { + // Count number of jobs to be aborted asynchronously + _abortCount = _jobs.size(); + + foreach (AbstractNetworkJob *job, _jobs) { + // Check if async abort is requested + if (job->reply() && abortType == AbortType::Asynchronous) { + // Connect to finished signal of job reply + // to asynchonously finish the abort + connect(job->reply(), &QNetworkReply::finished, this, &PropagateUploadFileCommon::slotReplyAbortFinished); + } + } + } else if (abortType == AbortType::Asynchronous) { + // Empty job list, emit abortFinished immedietaly + emit abortFinished(); + } +} + +void PropagateUploadFileCommon::slotReplyAbortFinished() +{ + _abortCount--; + + if (_abortCount == 0) { + emit abortFinished(); + } +} + } diff --git a/src/libsync/propagateupload.h b/src/libsync/propagateupload.h index 0168244643..361820530c 100644 --- a/src/libsync/propagateupload.h +++ b/src/libsync/propagateupload.h @@ -129,6 +129,11 @@ public: return true; } + QIODevice *device() + { + return _device; + } + QString errorString() { return _errorString.isEmpty() ? AbstractNetworkJob::errorString() : _errorString; @@ -205,6 +210,7 @@ protected: QVector _jobs; /// network jobs that are currently in transit bool _finished BITFIELD(1); /// Tells that all the jobs have been finished bool _deleteExisting BITFIELD(1); + quint64 _abortCount; /// Keep track of number of aborted items // measure the performance of checksum calc and upload #ifdef WITH_TESTING @@ -218,6 +224,7 @@ public: : PropagateItemJob(propagator, item) , _finished(false) , _deleteExisting(false) + , _abortCount(0) { } @@ -248,13 +255,20 @@ public: void abortWithError(SyncFileItem::Status status, const QString &error); public slots: - void abort() Q_DECL_OVERRIDE; + void abort(PropagatorJob::AbortType abortType) Q_DECL_OVERRIDE; void slotJobDestroyed(QObject *job); private slots: + void slotReplyAbortFinished(); void slotPollFinished(); protected: + /** + * Prepares the abort e.g. connects proper signals and slots + * to the subjobs to abort asynchronously + */ + void prepareAbort(PropagatorJob::AbortType abortType); + /** * Checks whether the current error is one that should reset the whole * transfer if it happens too often. If so: Bump UploadInfo::errorCount @@ -303,7 +317,6 @@ private: return propagator()->syncOptions()._initialChunkSize; } - public: PropagateUploadFileV1(OwncloudPropagator *propagator, const SyncFileItemPtr &item) : PropagateUploadFileCommon(propagator, item) @@ -311,7 +324,8 @@ public: } void doStartUpload() Q_DECL_OVERRIDE; - +public slots: + void abort(PropagatorJob::AbortType abortType) Q_DECL_OVERRIDE; private slots: void startNextChunk(); void slotPutFinished(); @@ -361,6 +375,8 @@ public: private: void startNewUpload(); void startNextChunk(); +public slots: + void abort(AbortType abortType) Q_DECL_OVERRIDE; private slots: void slotPropfindFinished(); void slotPropfindFinishedWithError(); diff --git a/src/libsync/propagateuploadng.cpp b/src/libsync/propagateuploadng.cpp index 5026f28a37..f8873654f1 100644 --- a/src/libsync/propagateuploadng.cpp +++ b/src/libsync/propagateuploadng.cpp @@ -491,4 +491,26 @@ void PropagateUploadFileNG::slotUploadProgress(qint64 sent, qint64 total) } propagator()->reportProgress(*_item, _sent + sent - total); } + +void PropagateUploadFileNG::abort(PropagatorJob::AbortType abortType) +{ + // Prepare abort + prepareAbort(abortType); + + // Abort all jobs (if there are any left), except final PUT + foreach (AbstractNetworkJob *job, _jobs) { + if (job->reply()) { + if (abortType == AbortType::Asynchronous && qobject_cast(job)){ + // If it is async abort, dont abort + // MoveJob since it might result in conflict, + // only PUT and MKDIR jobs can be safely aborted. + continue; + } + + // Abort the job + job->reply()->abort(); + } + } +} + } diff --git a/src/libsync/propagateuploadv1.cpp b/src/libsync/propagateuploadv1.cpp index c8b0e421df..ce63db5a52 100644 --- a/src/libsync/propagateuploadv1.cpp +++ b/src/libsync/propagateuploadv1.cpp @@ -350,4 +350,29 @@ void PropagateUploadFileV1::slotUploadProgress(qint64 sent, qint64 total) } propagator()->reportProgress(*_item, amount); } + +void PropagateUploadFileV1::abort(PropagatorJob::AbortType abortType) +{ + // Prepare abort + prepareAbort(abortType); + + // Abort all jobs (if there are any left), except final PUT + foreach (AbstractNetworkJob *job, _jobs) { + if (job->reply()) { + // If asynchronous abort allowed, + // dont abort final PUT which uploaded its data, + // since this might result in conflicts + if (PUTFileJob *putJob = qobject_cast(job)){ + if (abortType == AbortType::Asynchronous && (((_currentChunk + _startChunk) % _chunkCount) == 0) + && putJob->device()->atEnd()) { + continue; + } + } + + // Abort the job + job->reply()->abort(); + } + } +} + }