progress with new propagator

This commit is contained in:
Olivier Goffart 2013-08-14 19:59:16 +02:00
parent f8e6326880
commit e4128cd5d8
7 changed files with 143 additions and 70 deletions

View File

@ -194,6 +194,7 @@ int CSyncThread::treewalkFile( TREE_WALK_FILE *file, bool remote )
item._isDirectory = file->type == CSYNC_FTW_TYPE_DIR;
item._modtime = file->modtime;
item._etag = file->md5;
item._size = file->size;
SyncFileItem::Direction dir;
@ -227,6 +228,9 @@ int CSyncThread::treewalkFile( TREE_WALK_FILE *file, bool remote )
dir = !remote ? SyncFileItem::Down : SyncFileItem::Up;
break;
case CSYNC_INSTRUCTION_CONFLICT:
_progressInfo.overall_file_count++;
_progressInfo.overall_transmission_size += file->size;
//fall trough
case CSYNC_INSTRUCTION_IGNORE:
case CSYNC_INSTRUCTION_ERROR:
dir = SyncFileItem::None;
@ -234,6 +238,9 @@ int CSyncThread::treewalkFile( TREE_WALK_FILE *file, bool remote )
case CSYNC_INSTRUCTION_EVAL:
case CSYNC_INSTRUCTION_NEW:
case CSYNC_INSTRUCTION_SYNC:
_progressInfo.overall_file_count++;
_progressInfo.overall_transmission_size += file->size;
//fall trough
case CSYNC_INSTRUCTION_STAT_ERROR:
case CSYNC_INSTRUCTION_DELETED:
case CSYNC_INSTRUCTION_UPDATED:
@ -272,6 +279,9 @@ int CSyncThread::treewalkFinalize(TREE_WALK_FILE* file)
if (action != _performedActions.constEnd()) {
if (file->instruction != CSYNC_INSTRUCTION_NONE) {
// it is NONE if we are in the wrong tree (remote vs. local)
qDebug() << "UPDATING " << file->path << action->instruction;
file->instruction = action->instruction;
}
@ -325,22 +335,6 @@ void CSyncThread::startSync()
// maybe move this somewhere else where it can influence a running sync?
MirallConfigFile cfg;
int downloadLimit = 0;
if (cfg.useDownloadLimit()) {
downloadLimit = cfg.downloadLimit() * 1000;
}
csync_set_module_property(_csync_ctx, "bandwidth_limit_download", &downloadLimit);
int uploadLimit = -75; // 75%
int useUpLimit = cfg.useUploadLimit();
if ( useUpLimit >= 1) {
uploadLimit = cfg.uploadLimit() * 1000;
} else if (useUpLimit == 0) {
uploadLimit = 0;
}
csync_set_module_property(_csync_ctx, "bandwidth_limit_upload", &uploadLimit);
csync_set_progress_callback( _csync_ctx, cb_progress );
csync_set_module_property(_csync_ctx, "csync_context", _csync_ctx);
csync_set_userdata(_csync_ctx, this);
@ -383,6 +377,8 @@ void CSyncThread::startSync()
return;
}
_progressInfo = Progress::Info();
_hasFiles = false;
bool walkOk = true;
if( csync_walk_local_tree(_csync_ctx, &treewalkLocal, 0) < 0 ) {
@ -423,13 +419,28 @@ void CSyncThread::startSync()
_propagator.reset(new OwncloudPropagator (session, _localPath, _remotePath, &_progressDataBase));
connect(_propagator.data(), SIGNAL(completed(SyncFileItem, CSYNC_ERROR_CODE)),
this, SLOT(transferCompleted(SyncFileItem, CSYNC_ERROR_CODE)), Qt::QueuedConnection);
connect(_propagator.data(), SIGNAL(progress(Progress::Kind,QString,quint64,quint64)),
this, SLOT(slotProgress(Progress::Kind,QString,quint64,quint64)));
_iterator = 0;
startNextTransfer();
// if( csync_propagate(_csync_ctx) < 0 ) {
// handleSyncError(_csync_ctx, "cysnc_reconcile");
// return;
// }
int downloadLimit = 0;
if (cfg.useDownloadLimit()) {
downloadLimit = cfg.downloadLimit() * 1000;
}
_propagator->_downloadLimit = downloadLimit;
int uploadLimit = -75; // 75%
int useUpLimit = cfg.useUploadLimit();
if ( useUpLimit >= 1) {
uploadLimit = cfg.uploadLimit() * 1000;
} else if (useUpLimit == 0) {
uploadLimit = 0;
}
_propagator->_uploadLimit = uploadLimit;
slotProgress(Progress::StartSync, QString(), 0, 0);
startNextTransfer();
}
void CSyncThread::transferCompleted(const SyncFileItem &item, CSYNC_ERROR_CODE error)
@ -447,7 +458,7 @@ void CSyncThread::transferCompleted(const SyncFileItem &item, CSYNC_ERROR_CODE e
if (idx >= 0) {
_syncedItems[idx]._instruction = CSYNC_INSTRUCTION_ERROR;
_syncedItems[idx]._errorString = csyncErrorToString( error );
_syncedItems[idx]._errorDetail = item._errorString;
_syncedItems[idx]._errorDetail = item._errorDetail;
_syncedItems[idx]._httpCode = item._httpCode;
qDebug() << "File " << item._file << " propagator error " << _syncedItems[idx]._errorString
<< "(" << item._errorString << ")";
@ -474,13 +485,13 @@ void CSyncThread::transferCompleted(const SyncFileItem &item, CSYNC_ERROR_CODE e
_performedActions.insert(item._renameTarget.toUtf8(), a);
}
if (!item._isDirectory && a.instruction == CSYNC_INSTRUCTION_UPDATED
&& item._dir == SyncFileItem::Down) {
emit fileReceived(item._file);
if (!item._isDirectory && a.instruction == CSYNC_INSTRUCTION_UPDATED) {
slotProgress((item._dir != SyncFileItem::Up) ? Progress::EndDownload : Progress::EndUpload,
item._file, item._size, item._size);
_progressInfo.current_file_no++;
_progressInfo.overall_current_bytes += item._size;
}
//TODO progress %;
startNextTransfer();
}
@ -499,6 +510,13 @@ void CSyncThread::startNextTransfer()
continue;
}
_propagator->_etag.clear(); // FIXME : set to the right one
if (item._instruction == CSYNC_INSTRUCTION_SYNC || item._instruction == CSYNC_INSTRUCTION_NEW
|| item._instruction == CSYNC_INSTRUCTION_CONFLICT) {
slotProgress((item._dir != SyncFileItem::Up) ? Progress::StartDownload : Progress::StartUpload,
item._file, 0, item._size);
}
_propagator->propagate(item);
return; //propagate is async.
}
@ -517,6 +535,7 @@ void CSyncThread::startNextTransfer()
csync_commit(_csync_ctx);
qDebug() << "CSync run took " << _syncTime.elapsed() << " Milliseconds";
slotProgress(Progress::EndSync,QString(), 0 , 0);
emit finished();
_propagator.reset(0);
_syncMutex.unlock();
@ -567,32 +586,20 @@ Progress::Kind CSyncThread::csyncToProgressKind( enum csync_notify_type_e kind )
return pKind;
}
void CSyncThread::cb_progress( CSYNC_PROGRESS *progress, void *userdata )
void CSyncThread::slotProgress(Progress::Kind kind, const QString &file, quint64 curr, quint64 total)
{
if( !progress ) {
qDebug() << "No progress block in progress callback found!";
return;
}
if( !userdata ) {
qDebug() << "No thread given in progress callback!";
return;
}
Progress::Info pInfo;
CSyncThread *thread = static_cast<CSyncThread*>(userdata);
Progress::Info pInfo = _progressInfo;
pInfo.kind = thread->csyncToProgressKind( progress->kind );
pInfo.current_file = QUrl::fromEncoded( progress->path ).toString();
pInfo.file_size = progress->file_size;
pInfo.current_file_bytes = progress->curr_bytes;
pInfo.kind = kind;
pInfo.current_file = file;
pInfo.file_size = total;
pInfo.current_file_bytes = curr;
pInfo.overall_file_count = progress->overall_file_count;
pInfo.current_file_no = progress->current_file_no;
pInfo.overall_transmission_size = progress->overall_transmission_size;
pInfo.overall_current_bytes = progress->current_overall_bytes;
pInfo.overall_current_bytes += curr;
pInfo.timestamp = QDateTime::currentDateTime();
// Connect to something in folder!
thread->transmissionProgress( pInfo );
transmissionProgress( pInfo );
}
/* Given a path on the remote, give the path as it is when the rename is done */

View File

@ -60,8 +60,6 @@ public:
Q_INVOKABLE void startSync();
signals:
void fileReceived( const QString& );
void fileRemoved( const QString& );
void csyncError( const QString& );
void csyncWarning( const QString& );
void csyncUnavailable();
@ -79,12 +77,11 @@ signals:
private slots:
void transferCompleted(const SyncFileItem& item, CSYNC_ERROR_CODE error);
void startNextTransfer();
void slotProgress(Progress::Kind kind, const QString& file, quint64, quint64);
private:
void handleSyncError(CSYNC *ctx, const char *state);
static void cb_progress( CSYNC_PROGRESS *progress, void *userdata );
static int treewalkLocal( TREE_WALK_FILE*, void *);
static int treewalkRemote( TREE_WALK_FILE*, void *);
int treewalkFile( TREE_WALK_FILE*, bool );
@ -116,6 +113,9 @@ private:
QString adjustRenamedPath(const QString &original);
bool _hasFiles; // true if there is at least one file that is not ignored or removed
Progress::Info _progressInfo;
int _downloadLimit;
int _uploadLimit;
friend struct CSyncRunScopeHelper;
};

View File

@ -488,7 +488,7 @@ void Folder::startSync(const QStringList &pathList)
_thread = new QThread(this);
_thread->setPriority(QThread::LowPriority);
setIgnoredFiles();
_csync = new CSyncThread( _csync_ctx, path(), QUrl(Folder::secondPath()).path());
_csync = new CSyncThread( _csync_ctx, path(), QUrl(ownCloudInfo::instance()->webdavUrl() + secondPath()).path());
_csync->moveToThread(_thread);

View File

@ -235,11 +235,15 @@ csync_instructions_e OwncloudPropagator::uploadFile(const SyncFileItem &item)
_progressDb->remove(item._file);
}
//TODO
//ne_set_notifier(dav_session.ctx, ne_notify_status_cb, write_ctx);
ne_set_notifier(_session, notify_status_cb, this);
_lastTime.restart();
_lastProgress = 0;
_chunked_done = 0;
_chunked_total_size = item._size;
_currentFile = item._file;
if( state == HBF_SUCCESS ) {
//chunked_total_size = trans->stat_size;
_chunked_total_size = trans->stat_size;
/* Transfer all the chunks through the HTTP session using PUT. */
state = hbf_transfer( _session, trans.data(), "PUT" );
}
@ -272,14 +276,10 @@ csync_instructions_e OwncloudPropagator::uploadFile(const SyncFileItem &item)
}
} while( !finished );
// if (_progresscb) {
// ne_set_notifier(dav_session.ctx, 0, 0);
// _progresscb(write_ctx->url, rc != 0 ? CSYNC_NOTIFY_ERROR :
// CSYNC_NOTIFY_FINISHED_UPLOAD, error_code,
// (long long)(error_string), dav_session.userdata);
// }
ne_set_notifier(_session, 0, 0);
updateMTimeAndETag(uri.data(), item._modtime);
return CSYNC_INSTRUCTION_UPDATED;
}
@ -421,8 +421,6 @@ csync_instructions_e OwncloudPropagator::downloadFile(const SyncFileItem &item,
/* actually do the request */
int retry = 0;
// ne_set_notifier(dav_session.ctx, ne_notify_status_cb, write_ctx);
QScopedPointer<char, QScopedPointerPodDeleter> uri(ne_path_escape((_remoteDir + item._file).toUtf8()));
DownloadContext writeCtx(&tmpFile);
@ -444,6 +442,11 @@ csync_instructions_e OwncloudPropagator::downloadFile(const SyncFileItem &item,
* either the compressed- or uncompressed reader.
*/
ne_hook_post_headers( _session, DownloadContext::install_content_reader, &writeCtx);
ne_set_notifier(_session, notify_status_cb, this);
_lastProgress = 0;
_lastTime.start();
_chunked_done = _chunked_total_size = 0;
_currentFile = item._file;
int neon_stat = ne_request_dispatch(req.data());
@ -453,7 +456,8 @@ csync_instructions_e OwncloudPropagator::downloadFile(const SyncFileItem &item,
/* delete the hook again, otherwise they get chained as they are with the session */
ne_unhook_post_headers( _session, DownloadContext::install_content_reader, &writeCtx );
// ne_set_notifier(_session, 0, 0);
ne_set_notifier(_session, 0, 0);
_chunked_done = _chunked_total_size = 0;
if( updateErrorFromSession(neon_stat, req.data() ) ) {
qDebug("Error GET: Neon: %d", neon_stat);
@ -648,6 +652,49 @@ bool OwncloudPropagator::updateErrorFromSession(int neon_code, ne_request *req)
return re;
}
void OwncloudPropagator::notify_status_cb(void* userdata, ne_session_status status,
const ne_session_status_info* info)
{
OwncloudPropagator* this_ = reinterpret_cast<OwncloudPropagator *>(userdata);
if ((status == ne_status_sending || status == ne_status_recving)) {
if (info->sr.total > 0) {
emit this_->progress(Progress::Context, this_->_currentFile,
this_->_chunked_done + info->sr.progress,
this_->_chunked_total_size ? this_->_chunked_total_size : info->sr.total );
}
if (this_->_chunked_total_size && info->sr.total > 0 && info->sr.total == info->sr.progress) {
this_->_chunked_done += info->sr.total;
}
}
/* throttle connection */
int bandwidth_limit = 0;
if (status == ne_status_sending) bandwidth_limit = this_->_uploadLimit;
if (status == ne_status_recving) bandwidth_limit = this_->_downloadLimit;
if (bandwidth_limit > 0) {
int64_t diff = this_->_lastTime.nsecsElapsed() / 1000;
int64_t len = info->sr.progress - this_->_lastProgress;
if (len > 0 && diff > 0 && (1000000 * len / diff) > (int64_t)bandwidth_limit) {
int64_t wait_time = (1000000 * len / bandwidth_limit) - diff;
if (wait_time > 0) {
usleep(wait_time);
}
}
this_->_lastProgress = info->sr.progress;
this_->_lastTime.start();
} else if (bandwidth_limit < 0 && bandwidth_limit > -100) {
int64_t diff = this_->_lastTime.nsecsElapsed() / 1000;
if (diff > 0) {
// -bandwidth_limit is the % of bandwidth
int64_t wait_time = -diff * (1 + 100.0 / bandwidth_limit);
if (wait_time > 0) {
usleep(wait_time);
}
}
this_->_lastTime.start();
}
}
}

View File

@ -18,8 +18,10 @@
#include <neon/ne_request.h>
#include <QHash>
#include <QObject>
#include <qelapsedtimer.h>
#include "syncfileitem.h"
#include "progressdispatcher.h"
struct ne_session_s;
struct ne_decompress_s;
@ -58,6 +60,15 @@ class OwncloudPropagator : public QObject {
bool updateErrorFromSession(int neon_code = 0, ne_request *req = NULL);
QElapsedTimer _lastTime;
quint64 _lastProgress;
quint64 _chunked_total_size;
quint64 _chunked_done;
QString _currentFile;
static void notify_status_cb (void *userdata, ne_session_status status,
const ne_session_status_info *info);
public:
OwncloudPropagator(ne_session_s *session, const QString &localDir, const QString &remoteDir,
ProgressDatabase *progressDb)
@ -76,8 +87,12 @@ public:
QByteArray _etag;
bool _hasFatalError;
int _downloadLimit;
int _uploadLimit;
signals:
void completed(const SyncFileItem &, CSYNC_ERROR_CODE);
void progress(Progress::Kind, const QString &filename, quint64 bytes, quint64 total);
};

View File

@ -28,7 +28,7 @@ namespace Mirall {
class Progress
{
public:
typedef enum {
enum Kind {
Invalid,
StartSync,
Download,
@ -43,9 +43,9 @@ public:
StartDelete,
EndDelete,
Error
} Kind;
};
typedef struct {
struct Info {
Kind kind;
QString folder;
QString current_file;
@ -59,15 +59,18 @@ public:
QDateTime timestamp;
} Info;
Info() : kind(Invalid), file_size(0), current_file_bytes(0),
overall_file_count(0), current_file_no(0),
overall_transmission_size(0), overall_current_bytes(0) { }
};
typedef struct {
struct SyncProblem {
QString folder;
QString current_file;
QString error_message;
int error_code;
QDateTime timestamp;
} SyncProblem;
};
static QString asActionString( Kind );
static QString asResultString( Kind );

View File

@ -60,6 +60,7 @@ public:
bool _isDirectory;
time_t _modtime;
QByteArray _etag;
quint64 _size;
QString _errorString;
QString _errorDetail;