Fix paused sync file move issue #5949

Dont abort final chunk immedietally

Use sync and async aborts
This commit is contained in:
Piotr Mrowczynski 2017-08-11 00:03:03 +02:00 committed by ckamm
parent f598ac89ac
commit e10775d34f
14 changed files with 206 additions and 27 deletions

View File

@ -742,6 +742,16 @@ PropagatorJob::JobParallelism PropagatorCompositeJob::parallelism()
return FullParallelism;
}
void PropagatorCompositeJob::slotSubJobAbortFinished()
{
// Count that job has been finished
_abortsCount--;
// Emit abort if last job has been aborted
if (_abortsCount == 0) {
emit abortFinished();
}
}
bool PropagatorCompositeJob::scheduleSelfOrChild()
{
@ -900,7 +910,8 @@ void PropagateDirectory::slotFirstJobFinished(SyncFileItem::Status status)
if (status != SyncFileItem::Success && status != SyncFileItem::Restoration) {
if (_state != Finished) {
abort();
// Synchronously abort
abort(AbortType::Synchronous);
_state = Finished;
emit finished(status);
}

View File

@ -65,6 +65,11 @@ class PropagatorJob : public QObject
public:
explicit PropagatorJob(OwncloudPropagator *propagator);
enum AbortType {
Synchronous,
Asynchronous
};
enum JobState {
NotYetStarted,
Running,
@ -98,7 +103,14 @@ public:
virtual qint64 committedDiskSpace() const { return 0; }
public slots:
virtual void abort() {}
/*
* Asynchronous abort requires emit of abortFinished() signal,
* while synchronous is expected to abort immedietaly.
*/
virtual void abort(PropagatorJob::AbortType abortType) {
if (abortType == AbortType::Asynchronous)
emit abortFinished();
}
/** Starts this job, or a new subjob
* returns true if a job was started.
@ -110,11 +122,14 @@ signals:
*/
void finished(SyncFileItem::Status);
/**
* Emitted when the abort is fully finished
*/
void abortFinished(SyncFileItem::Status status = SyncFileItem::NormalError);
protected:
OwncloudPropagator *propagator() const;
};
/*
* Abstract class to propagate a single item
*/
@ -185,10 +200,11 @@ public:
SyncFileItemVector _tasksToDo;
QVector<PropagatorJob *> _runningJobs;
SyncFileItem::Status _hasError; // NoStatus, or NormalError / SoftError if there was an error
quint64 _abortsCount;
explicit PropagatorCompositeJob(OwncloudPropagator *propagator)
: PropagatorJob(propagator)
, _hasError(SyncFileItem::NoStatus)
, _hasError(SyncFileItem::NoStatus), _abortsCount(0)
{
}
@ -209,15 +225,32 @@ public:
virtual bool scheduleSelfOrChild() Q_DECL_OVERRIDE;
virtual JobParallelism parallelism() Q_DECL_OVERRIDE;
virtual void abort() Q_DECL_OVERRIDE
/*
* Abort synchronously or asynchronously - some jobs
* require to be finished without immediete abort (abort on job might
* cause conflicts/duplicated files - owncloud/client/issues/5949)
*/
virtual void abort(PropagatorJob::AbortType abortType) Q_DECL_OVERRIDE
{
foreach (PropagatorJob *j, _runningJobs)
j->abort();
if (!_runningJobs.empty()) {
_abortsCount = _runningJobs.size();
foreach (PropagatorJob *j, _runningJobs) {
if (abortType == AbortType::Asynchronous) {
connect(j, &PropagatorJob::abortFinished,
this, &PropagatorCompositeJob::slotSubJobAbortFinished);
}
j->abort(abortType);
}
} else if (abortType == AbortType::Asynchronous){
emit abortFinished();
}
}
qint64 committedDiskSpace() const Q_DECL_OVERRIDE;
private slots:
void slotSubJobAbortFinished();
bool possiblyRunNextJob(PropagatorJob *next)
{
if (next->_state == NotYetStarted) {
@ -258,11 +291,17 @@ public:
virtual bool scheduleSelfOrChild() Q_DECL_OVERRIDE;
virtual JobParallelism parallelism() Q_DECL_OVERRIDE;
virtual void abort() Q_DECL_OVERRIDE
virtual void abort(PropagatorJob::AbortType abortType) Q_DECL_OVERRIDE
{
if (_firstJob)
_firstJob->abort();
_subJobs.abort();
// Force first job to abort synchronously
// even if caller allows async abort (asyncAbort)
_firstJob->abort(AbortType::Synchronous);
if (abortType == AbortType::Asynchronous){
connect(&_subJobs, &PropagatorCompositeJob::abortFinished, this, &PropagateDirectory::abortFinished);
}
_subJobs.abort(abortType);
}
void increaseAffectedCount()
@ -280,6 +319,7 @@ private slots:
void slotFirstJobFinished(SyncFileItem::Status status);
void slotSubJobsFinished(SyncFileItem::Status status);
};
@ -324,6 +364,7 @@ public:
, _chunkSize(10 * 1000 * 1000) // 10 MB, overridden in setSyncOptions
, _account(account)
{
qRegisterMetaType<PropagatorJob::AbortType>("PropagatorJob::AbortType");
}
~OwncloudPropagator();
@ -406,11 +447,19 @@ public:
{
_abortRequested.fetchAndStoreOrdered(true);
if (_rootJob) {
// We're possibly already in an item's finished stack
QMetaObject::invokeMethod(_rootJob.data(), "abort", Qt::QueuedConnection);
// Connect to abortFinished which signals that abort has been asynchronously finished
connect(_rootJob.data(), &PropagateDirectory::abortFinished, this, &OwncloudPropagator::emitFinished);
// Use Queued Connection because we're possibly already in an item's finished stack
QMetaObject::invokeMethod(_rootJob.data(), "abort", Qt::QueuedConnection,
Q_ARG(PropagatorJob::AbortType, PropagatorJob::AbortType::Asynchronous));
// Give asynchronous abort 5000 msec to finish on its own
QTimer::singleShot(5000, this, SLOT(abortTimeout()));
} else {
// No root job, call emitFinished
emitFinished(SyncFileItem::NormalError);
}
// abort() of all jobs will likely have already resulted in finished being emitted, but just in case.
QMetaObject::invokeMethod(this, "emitFinished", Qt::QueuedConnection, Q_ARG(SyncFileItem::Status, SyncFileItem::NormalError));
}
// timeout in seconds
@ -431,6 +480,13 @@ public:
private slots:
void abortTimeout()
{
// Abort synchronously and finish
_rootJob.data()->abort(PropagatorJob::AbortType::Synchronous);
emitFinished(SyncFileItem::NormalError);
}
/** Emit the finished signal and make sure it is only emitted once */
void emitFinished(SyncFileItem::Status status)
{

View File

@ -900,9 +900,13 @@ void PropagateDownloadFile::slotDownloadProgress(qint64 received, qint64)
}
void PropagateDownloadFile::abort()
void PropagateDownloadFile::abort(PropagatorJob::AbortType abortType)
{
if (_job && _job->reply())
_job->reply()->abort();
if (abortType == AbortType::Asynchronous) {
emit abortFinished();
}
}
}

View File

@ -185,7 +185,7 @@ private slots:
/// Called when it's time to update the db metadata
void updateMetadata(bool isConflict);
void abort() Q_DECL_OVERRIDE;
void abort(PropagatorJob::AbortType abortType) Q_DECL_OVERRIDE;
void slotDownloadProgress(qint64, qint64);
void slotChecksumFail(const QString &errMsg);

View File

@ -75,10 +75,14 @@ void PropagateRemoteDelete::start()
_job->start();
}
void PropagateRemoteDelete::abort()
void PropagateRemoteDelete::abort(PropagatorJob::AbortType abortType)
{
if (_job && _job->reply())
_job->reply()->abort();
if (abortType == AbortType::Asynchronous) {
emit abortFinished();
}
}
void PropagateRemoteDelete::slotDeleteJobFinished()

View File

@ -52,7 +52,7 @@ public:
{
}
void start() Q_DECL_OVERRIDE;
void abort() Q_DECL_OVERRIDE;
void abort(PropagatorJob::AbortType abortType) Q_DECL_OVERRIDE;
bool isLikelyFinishedQuickly() Q_DECL_OVERRIDE { return !_item->isDirectory(); }

View File

@ -60,10 +60,14 @@ void PropagateRemoteMkdir::slotStartMkcolJob()
_job->start();
}
void PropagateRemoteMkdir::abort()
void PropagateRemoteMkdir::abort(PropagatorJob::AbortType abortType)
{
if (_job && _job->reply())
_job->reply()->abort();
if (abortType == AbortType::Asynchronous) {
emit abortFinished();
}
}
void PropagateRemoteMkdir::setDeleteExisting(bool enabled)

View File

@ -35,7 +35,7 @@ public:
{
}
void start() Q_DECL_OVERRIDE;
void abort() Q_DECL_OVERRIDE;
void abort(PropagatorJob::AbortType abortType) Q_DECL_OVERRIDE;
// Creating a directory should be fast.
bool isLikelyFinishedQuickly() Q_DECL_OVERRIDE { return true; }

View File

@ -117,10 +117,14 @@ void PropagateRemoteMove::start()
_job->start();
}
void PropagateRemoteMove::abort()
void PropagateRemoteMove::abort(PropagatorJob::AbortType abortType)
{
if (_job && _job->reply())
_job->reply()->abort();
if (abortType == AbortType::Asynchronous) {
emit abortFinished();
}
}
void PropagateRemoteMove::slotMoveJobFinished()

View File

@ -56,7 +56,7 @@ public:
{
}
void start() Q_DECL_OVERRIDE;
void abort() Q_DECL_OVERRIDE;
void abort(PropagatorJob::AbortType abortType) Q_DECL_OVERRIDE;
JobParallelism parallelism() Q_DECL_OVERRIDE { return _item->isDirectory() ? WaitForFinished : FullParallelism; }
/**

View File

@ -558,20 +558,24 @@ void PropagateUploadFileCommon::slotJobDestroyed(QObject *job)
_jobs.erase(std::remove(_jobs.begin(), _jobs.end(), job), _jobs.end());
}
void PropagateUploadFileCommon::abort()
void PropagateUploadFileCommon::abort(PropagatorJob::AbortType abortType)
{
foreach (auto *job, _jobs) {
if (job->reply()) {
job->reply()->abort();
}
}
if (abortType == AbortType::Asynchronous) {
emit abortFinished();
}
}
// This function is used whenever there is an error occuring and jobs might be in progress
void PropagateUploadFileCommon::abortWithError(SyncFileItem::Status status, const QString &error)
{
_finished = true;
abort();
abort(AbortType::Synchronous);
done(status, error);
}
@ -625,4 +629,33 @@ void PropagateUploadFileCommon::finalize()
done(SyncFileItem::Success);
}
void PropagateUploadFileCommon::prepareAbort(PropagatorJob::AbortType abortType) {
if (!_jobs.empty()) {
// Count number of jobs to be aborted asynchronously
_abortCount = _jobs.size();
foreach (AbstractNetworkJob *job, _jobs) {
// Check if async abort is requested
if (job->reply() && abortType == AbortType::Asynchronous) {
// Connect to finished signal of job reply
// to asynchonously finish the abort
connect(job->reply(), &QNetworkReply::finished, this, &PropagateUploadFileCommon::slotReplyAbortFinished);
}
}
} else if (abortType == AbortType::Asynchronous) {
// Empty job list, emit abortFinished immedietaly
emit abortFinished();
}
}
void PropagateUploadFileCommon::slotReplyAbortFinished()
{
_abortCount--;
if (_abortCount == 0) {
emit abortFinished();
}
}
}

View File

@ -129,6 +129,11 @@ public:
return true;
}
QIODevice *device()
{
return _device;
}
QString errorString()
{
return _errorString.isEmpty() ? AbstractNetworkJob::errorString() : _errorString;
@ -205,6 +210,7 @@ protected:
QVector<AbstractNetworkJob *> _jobs; /// network jobs that are currently in transit
bool _finished BITFIELD(1); /// Tells that all the jobs have been finished
bool _deleteExisting BITFIELD(1);
quint64 _abortCount; /// Keep track of number of aborted items
// measure the performance of checksum calc and upload
#ifdef WITH_TESTING
@ -218,6 +224,7 @@ public:
: PropagateItemJob(propagator, item)
, _finished(false)
, _deleteExisting(false)
, _abortCount(0)
{
}
@ -248,13 +255,20 @@ public:
void abortWithError(SyncFileItem::Status status, const QString &error);
public slots:
void abort() Q_DECL_OVERRIDE;
void abort(PropagatorJob::AbortType abortType) Q_DECL_OVERRIDE;
void slotJobDestroyed(QObject *job);
private slots:
void slotReplyAbortFinished();
void slotPollFinished();
protected:
/**
* Prepares the abort e.g. connects proper signals and slots
* to the subjobs to abort asynchronously
*/
void prepareAbort(PropagatorJob::AbortType abortType);
/**
* Checks whether the current error is one that should reset the whole
* transfer if it happens too often. If so: Bump UploadInfo::errorCount
@ -303,7 +317,6 @@ private:
return propagator()->syncOptions()._initialChunkSize;
}
public:
PropagateUploadFileV1(OwncloudPropagator *propagator, const SyncFileItemPtr &item)
: PropagateUploadFileCommon(propagator, item)
@ -311,7 +324,8 @@ public:
}
void doStartUpload() Q_DECL_OVERRIDE;
public slots:
void abort(PropagatorJob::AbortType abortType) Q_DECL_OVERRIDE;
private slots:
void startNextChunk();
void slotPutFinished();
@ -361,6 +375,8 @@ public:
private:
void startNewUpload();
void startNextChunk();
public slots:
void abort(AbortType abortType) Q_DECL_OVERRIDE;
private slots:
void slotPropfindFinished();
void slotPropfindFinishedWithError();

View File

@ -491,4 +491,26 @@ void PropagateUploadFileNG::slotUploadProgress(qint64 sent, qint64 total)
}
propagator()->reportProgress(*_item, _sent + sent - total);
}
void PropagateUploadFileNG::abort(PropagatorJob::AbortType abortType)
{
// Prepare abort
prepareAbort(abortType);
// Abort all jobs (if there are any left), except final PUT
foreach (AbstractNetworkJob *job, _jobs) {
if (job->reply()) {
if (abortType == AbortType::Asynchronous && qobject_cast<MoveJob *>(job)){
// If it is async abort, dont abort
// MoveJob since it might result in conflict,
// only PUT and MKDIR jobs can be safely aborted.
continue;
}
// Abort the job
job->reply()->abort();
}
}
}
}

View File

@ -350,4 +350,29 @@ void PropagateUploadFileV1::slotUploadProgress(qint64 sent, qint64 total)
}
propagator()->reportProgress(*_item, amount);
}
void PropagateUploadFileV1::abort(PropagatorJob::AbortType abortType)
{
// Prepare abort
prepareAbort(abortType);
// Abort all jobs (if there are any left), except final PUT
foreach (AbstractNetworkJob *job, _jobs) {
if (job->reply()) {
// If asynchronous abort allowed,
// dont abort final PUT which uploaded its data,
// since this might result in conflicts
if (PUTFileJob *putJob = qobject_cast<PUTFileJob *>(job)){
if (abortType == AbortType::Asynchronous && (((_currentChunk + _startChunk) % _chunkCount) == 0)
&& putJob->device()->atEnd()) {
continue;
}
}
// Abort the job
job->reply()->abort();
}
}
}
}