/************************************************************************* * UrBackup - Client/Server backup system * Copyright (C) 2011 Martin Raiber * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . **************************************************************************/ #include "DirectoryWatcherThread.h" #include "../Interface/Server.h" #include "../stringtools.h" #include "../urbackupcommon/os_functions.h" #include "database.h" #define CHANGE_JOURNAL IPipe *DirectoryWatcherThread::pipe=NULL; IMutex *DirectoryWatcherThread::update_mutex=NULL; ICondition *DirectoryWatcherThread::update_cond=NULL; const unsigned int mindifftime=120000; DirectoryWatcherThread::DirectoryWatcherThread(const std::vector &watchdirs) { do_stop=false; watching=watchdirs; for(size_t i=0;igetDatabase(Server->getThreadID(), URBACKUPDB_CLIENT); q_get_dir_backup=db->Prepare("SELECT id FROM mdirs_backup WHERE name=?"); q_get_dir=db->Prepare("SELECT id FROM mdirs WHERE name=?"); q_add_dir=db->Prepare("INSERT INTO mdirs (name) VALUES (?)"); q_add_dir_with_id=db->Prepare("INSERT INTO mdirs (id, name) VALUES (?, ?)"); q_add_del_dir=db->Prepare("INSERT INTO del_dirs SELECT ? AS NAME WHERE NOT EXISTS (SELECT * FROM del_dirs WHERE name=?)"); q_add_file=db->Prepare("INSERT INTO mfiles SELECT ? AS dir_id, ? AS name WHERE NOT EXISTS (SELECT * FROM mfiles WHERE dir_id=? AND name=?)"); q_update_last_backup_time=db->Prepare("INSERT OR REPLACE INTO misc (tkey, tvalue) VALUES ('last_backup_filetime', ?)"); ChangeListener cl; #ifdef CHANGE_JOURNAL ChangeJournalWatcher dcw(this, db, this); { db_results res = db->Read("SELECT tvalue FROM misc WHERE tkey='last_backup_filetime'"); if(!res.empty()) { dcw.set_last_backup_time(watoi64(res[0][L"tvalue"])); } } #else CDirectoryChangeWatcher dcw(false); #endif for(size_t i=0;iRead(&r, 10000); std::wstring msg; if(r.size()/sizeof(wchar_t)>0) { msg.resize( r.size()/sizeof(wchar_t)); memcpy(&msg[0], &r[0], r.size()); } #ifdef CHANGE_JOURNAL if(r.empty()) { dcw.update(); } #endif if(msg.size()>0 ) { if( msg[0]=='A' ) { std::wstring dir=strlower(addSlashIfMissing(msg.substr(1))); bool w=false; for(size_t i=0;inotify_all(); } #endif else if( msg[0]=='Q' ) { continue; } else if( msg[0]=='R' ) { std::wstring dir=strlower(msg.substr(1)); OnDirRm(dir); } else if( msg[0]=='F' ) { std::wstring fn=msg.substr(1); OnDirMod(strlower(ExtractFilePath(fn)), ExtractFileName(fn)); } #ifdef CHANGE_JOURNAL else if( msg[0]=='t' ) { FILETIME ft; SYSTEMTIME st; GetSystemTime(&st); SystemTimeToFileTime(&st, &ft); last_backup_filetime=static_cast<__int64>(ft.dwHighDateTime) << 32 | ft.dwLowDateTime; dcw.set_last_backup_time(last_backup_filetime); IScopedLock lock(update_mutex); update_cond->notify_all(); } else if( msg[0]=='T' ) { q_update_last_backup_time->Bind(last_backup_filetime); q_update_last_backup_time->Write(); q_update_last_backup_time->Reset(); IScopedLock lock(update_mutex); update_cond->notify_all(); } else if(msg[0]=='K' ) { dcw.set_freeze_open_write_files(true); IScopedLock lock(update_mutex); update_cond->notify_all(); } else if(msg[0]=='H' ) { dcw.set_freeze_open_write_files(false); IScopedLock lock(update_mutex); update_cond->notify_all(); } #endif else { std::wstring dir=strlower(msg.substr(1)); OnDirMod(dir, L""); } } } db->destroyAllQueries(); } void DirectoryWatcherThread::update(void) { std::wstring msg=L"U"; pipe->Write((char*)msg.c_str(), sizeof(wchar_t)*msg.size()); } void DirectoryWatcherThread::init_mutex(void) { pipe=Server->createMemoryPipe(); update_mutex=Server->createMutex(); update_cond=Server->createCondition(); } void DirectoryWatcherThread::update_and_wait(void) { IScopedLock lock(update_mutex); std::wstring msg=L"U"; pipe->Write((char*)msg.c_str(), sizeof(wchar_t)*msg.size()); update_cond->wait(&lock); } void DirectoryWatcherThread::freeze(void) { IScopedLock lock(update_mutex); std::wstring msg=L"K"; pipe->Write((char*)msg.c_str(), sizeof(wchar_t)*msg.size()); update_cond->wait(&lock); } void DirectoryWatcherThread::unfreeze(void) { IScopedLock lock(update_mutex); std::wstring msg=L"H"; pipe->Write((char*)msg.c_str(), sizeof(wchar_t)*msg.size()); update_cond->wait(&lock); } void DirectoryWatcherThread::update_last_backup_time(void) { IScopedLock lock(update_mutex); std::wstring msg=L"t"; pipe->Write((char*)msg.c_str(), sizeof(wchar_t)*msg.size()); update_cond->wait(&lock); } void DirectoryWatcherThread::commit_last_backup_time(void) { IScopedLock lock(update_mutex); std::wstring msg=L"T"; pipe->Write((char*)msg.c_str(), sizeof(wchar_t)*msg.size()); update_cond->wait(&lock); } void DirectoryWatcherThread::OnDirMod(const std::wstring &dir, const std::wstring &fn) { bool found=false; bool c=true; unsigned int currtime=Server->getTimeMS(); while(c) { c=false; for(std::list::iterator it=lastentries.begin();it!=lastentries.end();++it) { if(currtime-(*it).time>mindifftime) { lastentries.erase(it); c=true; break; } else if( (*it).dir==dir && (*it).fn==fn) { (*it).time=currtime; found=true; break; } } } if(found==false) { q_get_dir->Bind(dir); db_results res=q_get_dir->Read(); q_get_dir->Reset(); if(res.empty()) { q_get_dir_backup->Bind(dir); db_results res=q_get_dir_backup->Read(); q_get_dir_backup->Reset(); _i64 dir_id; if(res.empty()) { q_add_dir->Bind(dir); q_add_dir->Write(); q_add_dir->Reset(); dir_id=db->getLastInsertID(); } else { dir_id=watoi64(res[0][L"id"]); q_add_dir_with_id->Bind(dir_id); q_add_dir_with_id->Bind(dir); q_add_dir_with_id->Write(); q_add_dir_with_id->Reset(); } if(!fn.empty()) { q_add_file->Bind(dir_id); q_add_file->Bind(fn); q_add_file->Bind(dir_id); q_add_file->Bind(fn); q_add_file->Write(); q_add_file->Reset(); } } else if(!fn.empty()) { _i64 dir_id=watoi64(res[0][L"id"]); q_add_file->Bind(dir_id); q_add_file->Bind(fn); q_add_file->Bind(dir_id); q_add_file->Bind(fn); q_add_file->Write(); q_add_file->Reset(); } SLastEntries e; e.dir=dir; e.fn=fn; e.time=currtime; lastentries.push_back(e); } } void DirectoryWatcherThread::OnDirRm(const std::wstring &dir) { q_add_del_dir->Bind(dir); q_add_del_dir->Bind(dir); q_add_del_dir->Write(); q_add_del_dir->Reset(); } void DirectoryWatcherThread::stop(void) { do_stop=true; std::wstring msg=L"Q"; pipe->Write((char*)msg.c_str(), sizeof(wchar_t)*msg.size()); } IPipe *DirectoryWatcherThread::getPipe(void) { return pipe; } void DirectoryWatcherThread::On_FileNameChanged(const std::wstring & strOldFileName, const std::wstring & strNewFileName) { On_FileModified(strOldFileName, false); On_FileModified(strNewFileName, false); } void DirectoryWatcherThread::On_FileRemoved(const std::wstring & strFileName) { On_FileModified(strFileName, false); } void DirectoryWatcherThread::On_FileAdded(const std::wstring & strFileName) { On_FileModified(strFileName, false); } void DirectoryWatcherThread::On_FileModified(const std::wstring & strFileName, bool save_fn) { bool ok=false; std::wstring dir=strlower(ExtractFilePath(strFileName))+os_file_sep(); std::wstring fn; if(save_fn) { fn=ExtractFileName(strFileName); } for(size_t i=0;iWrite((char*)dir1.c_str(), dir1.size()*sizeof(wchar_t)); dir1=L"S"+ExtractFilePath(strNewFileName); DirectoryWatcherThread::getPipe()->Write((char*)dir1.c_str(), dir1.size()*sizeof(wchar_t)); } void ChangeListener::On_FileRemoved(const std::wstring & strFileName) { std::wstring dir1=L"S"+ExtractFilePath(strFileName); DirectoryWatcherThread::getPipe()->Write((char*)dir1.c_str(), dir1.size()*sizeof(wchar_t)); } void ChangeListener::On_FileAdded(const std::wstring & strFileName) { std::wstring dir1=L"S"+ExtractFilePath(strFileName); DirectoryWatcherThread::getPipe()->Write((char*)dir1.c_str(), dir1.size()*sizeof(wchar_t)); } void ChangeListener::On_FileModified(const std::wstring & strFileName, bool save_fn) { std::wstring dir1=L"S"+ExtractFilePath(strFileName); DirectoryWatcherThread::getPipe()->Write((char*)dir1.c_str(), dir1.size()*sizeof(wchar_t)); if(save_fn) { dir1=L"F"+strFileName; DirectoryWatcherThread::getPipe()->Write((char*)dir1.c_str(), dir1.size()*sizeof(wchar_t)); } } void ChangeListener::On_ResetAll(const std::wstring & vol) { std::wstring dir1=L"S##-GAP-##"+vol; DirectoryWatcherThread::getPipe()->Write((char*)dir1.c_str(), dir1.size()*sizeof(wchar_t)); } void ChangeListener::On_DirRemoved(const std::wstring & strDirName) { std::wstring dir1=L"R"+ExtractFilePath(strDirName); DirectoryWatcherThread::getPipe()->Write((char*)dir1.c_str(), dir1.size()*sizeof(wchar_t)); } std::wstring DirectoryWatcherThread::addSlashIfMissing(const std::wstring &strDirName) { if(!strDirName.empty() && strDirName[strDirName.size()-1]!=os_file_sep()[0]) { return strDirName+os_file_sep(); } return strDirName; }