From a1558100b8345e07026bfc4298fe64895df3aeea Mon Sep 17 00:00:00 2001 From: Olivier Goffart Date: Tue, 2 Aug 2016 13:48:56 +0200 Subject: [PATCH] WIP: new chunking algorithm Current limitations of this WiP - No resuming implemented yet - No parallel chunks - Hackish way to get the webdav paths --- src/libsync/CMakeLists.txt | 1 + src/libsync/account.cpp | 7 +- src/libsync/account.h | 3 + src/libsync/networkjobs.cpp | 8 +- src/libsync/networkjobs.h | 2 + src/libsync/propagateremotemove.cpp | 10 +- src/libsync/propagateremotemove.h | 4 + src/libsync/propagateupload.cpp | 3 +- src/libsync/propagateupload.h | 40 ++++ src/libsync/propagateuploadng.cpp | 351 ++++++++++++++++++++++++++++ 10 files changed, 424 insertions(+), 5 deletions(-) create mode 100644 src/libsync/propagateuploadng.cpp diff --git a/src/libsync/CMakeLists.txt b/src/libsync/CMakeLists.txt index d68ec323c3..bae54bf37c 100644 --- a/src/libsync/CMakeLists.txt +++ b/src/libsync/CMakeLists.txt @@ -52,6 +52,7 @@ set(libsync_SRCS propagatedownload.cpp propagateupload.cpp propagateuploadv1.cpp + propagateuploadng.cpp propagateremotedelete.cpp propagateremotemove.cpp propagateremotemkdir.cpp diff --git a/src/libsync/account.cpp b/src/libsync/account.cpp index 510dfd3aa8..07ab0970f6 100644 --- a/src/libsync/account.cpp +++ b/src/libsync/account.cpp @@ -76,11 +76,14 @@ AccountPtr Account::sharedFromThis() return _sharedThis.toStrongRef(); } +QString Account::user() const +{ + return _credentials->user(); +} QString Account::displayName() const { - auto user = _credentials->user(); - QString dn = QString("%1@%2").arg(user, _url.host()); + QString dn = QString("%1@%2").arg(user(), _url.host()); int port = url().port(); if (port > 0 && port != 80 && port != 443) { dn.append(QLatin1Char(':')); diff --git a/src/libsync/account.h b/src/libsync/account.h index c4852e3aea..797cd14227 100644 --- a/src/libsync/account.h +++ b/src/libsync/account.h @@ -75,6 +75,9 @@ public: void setSharedThis(AccountPtr sharedThis); AccountPtr sharedFromThis(); + /// The user that can be used in dav url + QString user() const; + /// The name of the account as shown in the toolbar QString displayName() const; diff --git a/src/libsync/networkjobs.cpp b/src/libsync/networkjobs.cpp index ab55984685..5fd3c83a63 100644 --- a/src/libsync/networkjobs.cpp +++ b/src/libsync/networkjobs.cpp @@ -106,6 +106,11 @@ MkColJob::MkColJob(AccountPtr account, const QString &path, QObject *parent) { } +MkColJob::MkColJob(AccountPtr account, const QUrl &url, QObject *parent) + : AbstractNetworkJob(account, QString(), parent), _url(url) +{ +} + void MkColJob::start() { // add 'Content-Length: 0' header (see https://github.com/owncloud/client/issues/3256) @@ -113,7 +118,8 @@ void MkColJob::start() req.setRawHeader("Content-Length", "0"); // assumes ownership - QNetworkReply *reply = davRequest("MKCOL", path(), req); + QNetworkReply *reply = _url.isValid() ? davRequest("MKCOL", _url, req) + : davRequest("MKCOL", path(), req); setReply(reply); setupConnections(reply); AbstractNetworkJob::start(); diff --git a/src/libsync/networkjobs.h b/src/libsync/networkjobs.h index c2cc978c4d..94deb3af16 100644 --- a/src/libsync/networkjobs.h +++ b/src/libsync/networkjobs.h @@ -170,8 +170,10 @@ private: */ class OWNCLOUDSYNC_EXPORT MkColJob : public AbstractNetworkJob { Q_OBJECT + QUrl _url; // Only used if the constructor taking a url is taken. public: explicit MkColJob(AccountPtr account, const QString &path, QObject *parent = 0); + explicit MkColJob(AccountPtr account, const QUrl &url, QObject *parent = 0); void start() Q_DECL_OVERRIDE; signals: diff --git a/src/libsync/propagateremotemove.cpp b/src/libsync/propagateremotemove.cpp index 5a75e761e9..4bc8e3a947 100644 --- a/src/libsync/propagateremotemove.cpp +++ b/src/libsync/propagateremotemove.cpp @@ -27,12 +27,20 @@ MoveJob::MoveJob(AccountPtr account, const QString& path, : AbstractNetworkJob(account, path, parent), _destination(destination) { } +MoveJob::MoveJob(AccountPtr account, const QUrl& url, const QString &destination, + QMap extraHeaders, QObject* parent) + : AbstractNetworkJob(account, QString(), parent), _destination(destination), _url(url) + , _extraHeaders(extraHeaders) +{ } void MoveJob::start() { QNetworkRequest req; req.setRawHeader("Destination", QUrl::toPercentEncoding(_destination, "/")); - setReply(davRequest("MOVE", path(), req)); + for(auto it = _extraHeaders.constBegin(); it != _extraHeaders.constEnd(); ++it) { + req.setRawHeader(it.key(), it.value()); + } + setReply(_url.isValid() ? davRequest("MOVE", _url, req) : davRequest("MOVE", path(), req)); setupConnections(reply()); if( reply()->error() != QNetworkReply::NoError ) { diff --git a/src/libsync/propagateremotemove.h b/src/libsync/propagateremotemove.h index 3ce54f4a80..4146d4b17f 100644 --- a/src/libsync/propagateremotemove.h +++ b/src/libsync/propagateremotemove.h @@ -25,8 +25,12 @@ namespace OCC { class MoveJob : public AbstractNetworkJob { Q_OBJECT const QString _destination; + const QUrl _url; // Only used (instead of path) when the constructor taking an URL is used + QMap _extraHeaders; public: explicit MoveJob(AccountPtr account, const QString& path, const QString &destination, QObject* parent = 0); + explicit MoveJob(AccountPtr account, const QUrl& url, const QString &destination, + QMap _extraHeaders, QObject* parent = 0); void start() Q_DECL_OVERRIDE; bool finished() Q_DECL_OVERRIDE; diff --git a/src/libsync/propagateupload.cpp b/src/libsync/propagateupload.cpp index 485937f66f..388752ad93 100644 --- a/src/libsync/propagateupload.cpp +++ b/src/libsync/propagateupload.cpp @@ -72,7 +72,8 @@ void PUTFileJob::start() { req.setRawHeader(it.key(), it.value()); } - setReply(davRequest("PUT", path(), req, _device.data())); + setReply(_url.isValid() ? davRequest("PUT", _url, req, _device.data()) + : davRequest("PUT", path(), req, _device.data())); setupConnections(reply()); if( reply()->error() != QNetworkReply::NoError ) { diff --git a/src/libsync/propagateupload.h b/src/libsync/propagateupload.h index ea9f546b8f..4a06f2f131 100644 --- a/src/libsync/propagateupload.h +++ b/src/libsync/propagateupload.h @@ -89,12 +89,17 @@ private: QScopedPointer _device; QMap _headers; QString _errorString; + QUrl _url; public: // Takes ownership of the device explicit PUTFileJob(AccountPtr account, const QString& path, QIODevice *device, const QMap &headers, int chunk, QObject* parent = 0) : AbstractNetworkJob(account, path, parent), _device(device), _headers(headers), _chunk(chunk) {} + explicit PUTFileJob(AccountPtr account, const QUrl& url, QIODevice *device, + const QMap &headers, int chunk, QObject* parent = 0) + : AbstractNetworkJob(account, QString(), parent), _device(device), _headers(headers) + , _url(url), _chunk(chunk) {} ~PUTFileJob(); int _chunk; @@ -270,5 +275,40 @@ private slots: void slotUploadProgress(qint64,qint64); }; +/** + * @ingroup libsync + * + * Propagation job, impementing the new chunking agorithm + * + */ +class PropagateUploadFileNG : public PropagateUploadFileCommon { + Q_OBJECT +private: + quint64 _sent; /// amount of data that was already sent + uint _transferId; /// transfer id (part of the url) + int _currentChunk; + + quint64 chunkSize() const { return _propagator->chunkSize(); } + /** + * Return the URL of a chunk. + * If chunk == -1, returns the URL of the parent folder containing the chunks + */ + QUrl chunkUrl(int chunk = -1); + +public: + PropagateUploadFileNG(OwncloudPropagator* propagator,const SyncFileItemPtr& item) : + PropagateUploadFileCommon(propagator,item) {} + + void doStartUpload() Q_DECL_OVERRIDE; + +private slots: + void slotMkColFinished(QNetworkReply::NetworkError); + void startNextChunk(); + void slotPutFinished(); + void slotMoveJobFinished(); + void slotUploadProgress(qint64,qint64); +}; + + } diff --git a/src/libsync/propagateuploadng.cpp b/src/libsync/propagateuploadng.cpp new file mode 100644 index 0000000000..d2f89aba86 --- /dev/null +++ b/src/libsync/propagateuploadng.cpp @@ -0,0 +1,351 @@ +/* + * Copyright (C) by Olivier Goffart + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * for more details. + */ + +#include "config.h" +#include "propagateupload.h" +#include "owncloudpropagator_p.h" +#include "networkjobs.h" +#include "account.h" +#include "syncjournaldb.h" +#include "syncjournalfilerecord.h" +#include "utility.h" +#include "filesystem.h" +#include "propagatorjobs.h" +#include "syncengine.h" +#include "propagateremotemove.h" + +#include +#include +#include +#include +#include + +namespace OCC { + +QUrl PropagateUploadFileNG::chunkUrl(int chunk) +{ + // FIXME! we should not use the user from the credential, we should have it in the account + QString path = QLatin1String("remote.php/dav/uploads/") + + _propagator->account()->user() + + QLatin1Char('/') + QString::number(_transferId); + if (chunk >= 0) { + path += QLatin1Char('/') + QString::number(chunk); + } + return Account::concatUrlPath(_propagator->account()->url(), path); +} + +/* + State machine: + + *----> doStartUpload() + Check the db: is there an entry? + / \ + no yes + / \ + MKCOL PROPFIND (TODO) + | + slotMkColFinished() : + | : + +-----+---------------------+ + | + +----> startNextChunk() ---finished? --+ + ^ | | + +---------------+ | + | + +----------------------------------------+ + | + +-> MOVE ------> moveJobFinished() ---> finalize() + + + */ + + + +void PropagateUploadFileNG::doStartUpload() +{ + _transferId = qrand() ^ _item->_modtime ^ (_item->_size << 16) ^ qHash(_item->_file); + +#if FUTURE + const SyncJournalDb::UploadInfo progressInfo = _propagator->_journal->getUploadInfo(_item->_file); + + if (progressInfo._valid && Utility::qDateTimeToTime_t(progressInfo._modtime) == _item->_modtime ) { + _transferId = progressInfo._transferid; + // TODO: make a PROPFIND call to check what the size on the server is + //_startChunk = progressInfo._chunk; + //qDebug() << Q_FUNC_INFO << _item->_file << ": Resuming from chunk " << _startChunk; + + } +#endif + _propagator->_activeJobList.append(this); + + _sent = 0; + _currentChunk = 0; + _duration.start(); + + emit progress(*_item, 0); + + auto job = new MkColJob(_propagator->account(), + chunkUrl(), + this); + + connect(job, SIGNAL(finished(QNetworkReply::NetworkError)), + this, SLOT(slotMkColFinished(QNetworkReply::NetworkError))); + connect(job, SIGNAL(destroyed(QObject*)), this, SLOT(slotJobDestroyed(QObject*))); + job->start(); +} + +void PropagateUploadFileNG::slotMkColFinished(QNetworkReply::NetworkError) +{ + _propagator->_activeJobList.removeOne(this); + auto job = qobject_cast(sender()); + slotJobDestroyed(job); // remove it from the _jobs list + QNetworkReply::NetworkError err = job->reply()->error(); + _item->_httpErrorCode = job->reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt(); + + if (err != QNetworkReply::NoError || _item->_httpErrorCode != 201) { + SyncFileItem::Status status = classifyError(err, _item->_httpErrorCode, + &_propagator->_anotherSyncNeeded); + QString errorString = errorMessage(job->reply()->errorString(), job->reply()->readAll()); + if (job->reply()->hasRawHeader("OC-ErrorString")) { + errorString = job->reply()->rawHeader("OC-ErrorString"); + } + abortWithError(status, errorString); + return; + } + startNextChunk(); +} + +void PropagateUploadFileNG::startNextChunk() +{ + if (_propagator->_abortRequested.fetchAndAddRelaxed(0)) + return; + + quint64 fileSize = _item->_size; + + quint64 currentChunkSize = qMin(chunkSize(), fileSize - _sent); + + if (currentChunkSize <= 0) { + Q_ASSERT(_jobs.isEmpty()); // There should be no running job anymore + _finished = true; + // Finish with a MOVE + QString destination = _propagator->_remoteDir + _item->_file; + auto headers = PropagateUploadFileCommon::headers(); + auto job = new MoveJob(_propagator->account(), Account::concatUrlPath(chunkUrl(), "/.file"), + destination, headers, this); + _jobs.append(job); + connect(job, SIGNAL(finishedSignal()), this, SLOT(slotMoveJobFinished())); + connect(job, SIGNAL(destroyed(QObject*)), this, SLOT(slotJobDestroyed(QObject*))); + _propagator->_activeJobList.append(this); + job->start(); + return; + } + + auto device = new UploadDevice(&_propagator->_bandwidthManager); + const QString fileName = _propagator->getFilePath(_item->_file); + + if (! device->prepareAndOpen(fileName, _sent, currentChunkSize)) { + qDebug() << "ERR: Could not prepare upload device: " << device->errorString(); + + // If the file is currently locked, we want to retry the sync + // when it becomes available again. + if (FileSystem::isFileLocked(fileName)) { + emit _propagator->seenLockedFile(fileName); + } + // Soft error because this is likely caused by the user modifying his files while syncing + abortWithError( SyncFileItem::SoftError, device->errorString() ); + return; + } + + _sent += currentChunkSize; + QUrl url = chunkUrl(_currentChunk); + + // job takes ownership of device via a QScopedPointer. Job deletes itself when finishing + PUTFileJob* job = new PUTFileJob(_propagator->account(), url, device, {}, _currentChunk); + _jobs.append(job); + connect(job, SIGNAL(finishedSignal()), this, SLOT(slotPutFinished())); + connect(job, SIGNAL(uploadProgress(qint64,qint64)), + this, SLOT(slotUploadProgress(qint64,qint64))); + connect(job, SIGNAL(uploadProgress(qint64,qint64)), + device, SLOT(slotJobUploadProgress(qint64,qint64))); + connect(job, SIGNAL(destroyed(QObject*)), this, SLOT(slotJobDestroyed(QObject*))); + job->start(); + _propagator->_activeJobList.append(this); + _currentChunk++; + + // FIXME! parallel chunk? + +} + +void PropagateUploadFileNG::slotPutFinished() +{ + PUTFileJob *job = qobject_cast(sender()); + Q_ASSERT(job); + slotJobDestroyed(job); // remove it from the _jobs list + + qDebug() << job->reply()->request().url() << "FINISHED WITH STATUS" + << job->reply()->error() + << (job->reply()->error() == QNetworkReply::NoError ? QLatin1String("") : job->reply()->errorString()) + << job->reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute) + << job->reply()->attribute(QNetworkRequest::HttpReasonPhraseAttribute); + + _propagator->_activeJobList.removeOne(this); + + if (_finished) { + // We have sent the finished signal already. We don't need to handle any remaining jobs + return; + } + + QNetworkReply::NetworkError err = job->reply()->error(); + +#if QT_VERSION < QT_VERSION_CHECK(5, 4, 2) + if (err == QNetworkReply::OperationCanceledError && job->reply()->property(owncloudShouldSoftCancelPropertyName).isValid()) { + // Abort the job and try again later. + // This works around a bug in QNAM wich might reuse a non-empty buffer for the next request. + qDebug() << "Forcing job abort on HTTP connection reset with Qt < 5.4.2."; + _propagator->_anotherSyncNeeded = true; + abortWithError(SyncFileItem::SoftError, tr("Forcing job abort on HTTP connection reset with Qt < 5.4.2.")); + return; + } +#endif + + if (err != QNetworkReply::NoError) { + _item->_httpErrorCode = job->reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt(); + QByteArray replyContent = job->reply()->readAll(); + qDebug() << replyContent; // display the XML error in the debug + QString errorString = errorMessage(job->errorString(), replyContent); + + if (job->reply()->hasRawHeader("OC-ErrorString")) { + errorString = job->reply()->rawHeader("OC-ErrorString"); + } + + // FIXME! can tth peneunking? + if (_item->_httpErrorCode == 412) { + // Precondition Failed: Maybe the bad etag is in the database, we need to clear the + // parent folder etag so we won't read from DB next sync. + _propagator->_journal->avoidReadFromDbOnNextSync(_item->_file); + _propagator->_anotherSyncNeeded = true; + } + + SyncFileItem::Status status = classifyError(err, _item->_httpErrorCode, + &_propagator->_anotherSyncNeeded); + abortWithError(status, errorString); + return; + } + + bool finished = _sent >= _item->_size; + + // Check if the file still exists + const QString fullFilePath(_propagator->getFilePath(_item->_file)); + if( !FileSystem::fileExists(fullFilePath) ) { + if (!finished) { + abortWithError(SyncFileItem::SoftError, tr("The local file was removed during sync.")); + return; + } else { + _propagator->_anotherSyncNeeded = true; + } + } + + // Check whether the file changed since discovery. + if (! FileSystem::verifyFileUnchanged(fullFilePath, _item->_size, _item->_modtime)) { + _propagator->_anotherSyncNeeded = true; + if( !finished ) { + abortWithError(SyncFileItem::SoftError, tr("Local file changed during sync.")); + // FIXME: the legacy code was retrying for a few seconds. + // and also checking that after the last chunk, and removed the file in case of INSTRUCTION_NEW + return; + } + } + + if (!finished) { + // Deletes an existing blacklist entry on successful chunk upload + if (_item->_hasBlacklistEntry) { + _propagator->_journal->wipeErrorBlacklistEntry(_item->_file); + _item->_hasBlacklistEntry = false; + } + + SyncJournalDb::UploadInfo pi; + pi._valid = true; + auto currentChunk = job->_chunk; + foreach (auto *job, _jobs) { + // Take the minimum finished one + if (auto putJob = qobject_cast(job)) { + currentChunk = qMin(currentChunk, putJob->_chunk - 1); + } + } + pi._chunk = currentChunk; // FIXME + pi._transferid = _transferId; + pi._modtime = Utility::qDateTimeFromTime_t(_item->_modtime); + _propagator->_journal->setUploadInfo(_item->_file, pi); + _propagator->_journal->commit("Upload info"); + } + startNextChunk(); +} + +void PropagateUploadFileNG::slotMoveJobFinished() +{ + _propagator->_activeJobList.removeOne(this); + auto job = qobject_cast(sender()); + slotJobDestroyed(job); // remove it from the _jobs list + QNetworkReply::NetworkError err = job->reply()->error(); + _item->_httpErrorCode = job->reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt(); + + if (err != QNetworkReply::NoError || _item->_httpErrorCode != 201) { + SyncFileItem::Status status = classifyError(err, _item->_httpErrorCode, + &_propagator->_anotherSyncNeeded); + QString errorString = errorMessage(job->errorString(), job->reply()->readAll()); + abortWithError(status, errorString); + return; + } + + QByteArray fid = job->reply()->rawHeader("OC-FileID"); + if(fid.isEmpty()) { + qWarning() << "Server did not return a OC-FileID" << _item->_file; + } else { + // the old file id should only be empty for new files uploaded + if( !_item->_fileId.isEmpty() && _item->_fileId != fid ) { + qDebug() << "WARN: File ID changed!" << _item->_fileId << fid; + } + _item->_fileId = fid; + } + + _item->_etag = getEtagFromReply(job->reply());; + if (_item->_etag.isEmpty()) { + qWarning() << "Server did not return an ETAG" << _item->_file; + } + _item->_responseTimeStamp = job->responseTimestamp(); + + // performance logging + _item->_requestDuration = _stopWatch.stop(); + qDebug() << "*==* duration UPLOAD" << _item->_size + << _stopWatch.durationOfLap(QLatin1String("ContentChecksum")) + << _stopWatch.durationOfLap(QLatin1String("TransmissionChecksum")) + << _item->_requestDuration; + // The job might stay alive for the whole sync, release this tiny bit of memory. + _stopWatch.reset(); + finalize(); +} + +void PropagateUploadFileNG::slotUploadProgress(qint64 sent, qint64 total) +{ + // Completion is signaled with sent=0, total=0; avoid accidentally + // resetting progress due to the sent being zero by ignoring it. + // finishedSignal() is bound to be emitted soon anyway. + // See https://bugreports.qt.io/browse/QTBUG-44782. + if (sent == 0 && total == 0) { + return; + } + emit progress(*_item, _sent + sent - total); +} + +}