/************************************************************************* * UrBackup - Client/Server backup system * Copyright (C) 2011-2016 Martin Raiber * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . **************************************************************************/ #include "DirectoryWatcherThread.h" #include "../Interface/Server.h" #include "../stringtools.h" #include "../urbackupcommon/os_functions.h" #include "database.h" #include "client.h" #include "clientdao.h" #define CHANGE_JOURNAL IPipe *DirectoryWatcherThread::pipe=NULL; IMutex *DirectoryWatcherThread::update_mutex=NULL; ICondition *DirectoryWatcherThread::update_cond=NULL; std::vector DirectoryWatcherThread::open_files; namespace { const unsigned int max_change_ram_cache=10*60*1000; } DirectoryWatcherThread::DirectoryWatcherThread(const std::vector &watchdirs, const std::vector &watchdirs_continuous) { do_stop=false; watching=watchdirs; for(size_t i=0;iaddWatchdir(watchdirs_continuous[i]); } } } void DirectoryWatcherThread::operator()(void) { db=Server->getDatabase(Server->getThreadID(), URBACKUPDB_CLIENT); q_add_dir=db->Prepare("INSERT INTO mdirs (name) SELECT ? AS name WHERE NOT EXISTS (SELECT * FROM mdirs WHERE name=?)"); q_add_del_dir=db->Prepare("INSERT INTO del_dirs SELECT ? AS NAME WHERE NOT EXISTS (SELECT * FROM del_dirs WHERE name=?)"); q_update_last_backup_time=db->Prepare("INSERT OR REPLACE INTO misc (tkey, tvalue) VALUES ('last_backup_filetime', ?)"); q_remove_changed_dirs = db->Prepare("DELETE FROM mdirs WHERE name GLOB ?"); ChangeJournalWatcher dcw(this, db); dcw.add_listener(this); { db_results res = db->Read("SELECT tvalue FROM misc WHERE tkey='last_backup_filetime'"); if(!res.empty()) { dcw.set_last_backup_time(watoi64(res[0]["tvalue"])); } } for(size_t i=0;iRead(&msg, 10000); #ifdef CHANGE_JOURNAL if(msg.empty()) { dcw.update(); } #endif if(msg.size()>0 ) { if( msg[0]=='A' ) { std::string dir=strlower(add_trailing_slash(msg.substr(1))); bool w=false; for(size_t i=0;iaddWatchdir(ContinuousWatchEnqueue::SWatchItem(dir, name)); } else if( msg[0]=='X') { std::string dir=strlower(add_trailing_slash(getuntil("|", msg.substr(1)))); std::string name=getafter("|", msg.substr(1)); continuous_watch->removeWatchdir(ContinuousWatchEnqueue::SWatchItem(dir, name)); } else if( msg[0]=='U' ) { dcw.update(); IScopedLock lock(update_mutex); open_files.clear(); dcw.update_longliving(); update_cond->notify_all(); } else if( msg[0]=='Q' ) { continue; } else if( msg[0]=='t' ) { last_backup_filetime=get_current_filetime(); IScopedLock lock(update_mutex); update_cond->notify_all(); } else if( msg[0]=='T' ) { dcw.set_last_backup_time(last_backup_filetime); q_update_last_backup_time->Bind(last_backup_filetime); q_update_last_backup_time->Write(); q_update_last_backup_time->Reset(); IScopedLock lock(update_mutex); update_cond->notify_all(); } else if(msg[0]=='K' ) { dcw.set_freeze_open_write_files(true); IScopedLock lock(update_mutex); update_cond->notify_all(); } else if(msg[0]=='H' ) { dcw.set_freeze_open_write_files(false); IScopedLock lock(update_mutex); update_cond->notify_all(); } else if (msg[0] == 'R') { std::string path = msg.substr(1); std::string sep = os_file_sep(); if (path == "##-GAP-##" || path.empty()) { sep = ""; } q_remove_changed_dirs->Bind(ClientDAO::escapeGlob(path) + sep + "*"); q_remove_changed_dirs->Write(); q_remove_changed_dirs->Reset(); lastentries.clear(); IScopedLock lock(update_mutex); update_cond->notify_all(); } } } db->destroyAllQueries(); } void DirectoryWatcherThread::update(void) { pipe->Write("U"); } void DirectoryWatcherThread::init_mutex(void) { pipe=Server->createMemoryPipe(); update_mutex=Server->createMutex(); update_cond=Server->createCondition(); } void DirectoryWatcherThread::update_and_wait(std::vector& r_open_files) { IScopedLock lock(update_mutex); pipe->Write("U"); update_cond->wait(&lock); r_open_files.insert(r_open_files.end(), open_files.begin(), open_files.end()); } void DirectoryWatcherThread::reset_mdirs(const std::string& path) { IScopedLock lock(update_mutex); pipe->Write("R"+ path); update_cond->wait(&lock); } void DirectoryWatcherThread::freeze(void) { IScopedLock lock(update_mutex); pipe->Write("K"); update_cond->wait(&lock); } void DirectoryWatcherThread::unfreeze(void) { IScopedLock lock(update_mutex); pipe->Write("H"); update_cond->wait(&lock); } void DirectoryWatcherThread::update_last_backup_time(void) { IScopedLock lock(update_mutex); pipe->Write("t"); update_cond->wait(&lock); } void DirectoryWatcherThread::commit_last_backup_time(void) { IScopedLock lock(update_mutex); pipe->Write("T"); update_cond->wait(&lock); } void DirectoryWatcherThread::OnDirMod(const std::string &dir) { bool found=false; int64 currtime=Server->getTimeMS(); for(std::list::iterator it=lastentries.begin();it!=lastentries.end();) { if(currtime-(*it).time>max_change_ram_cache) { lastentries.erase(it++); continue; } else if( (*it).dir==dir ) { (*it).time=currtime; found=true; break; } it++; } if(!found) { q_add_dir->Bind(dir); q_add_dir->Bind(dir); q_add_dir->Write(); q_add_dir->Reset(); SLastEntries e; e.dir=dir; e.time=currtime; lastentries.push_back(e); } } void DirectoryWatcherThread::OnDirRm(const std::string &dir) { q_add_del_dir->Bind(dir); q_add_del_dir->Bind(dir); q_add_del_dir->Write(); q_add_del_dir->Reset(); } void DirectoryWatcherThread::stop(void) { do_stop=true; pipe->Write("Q"); } IPipe *DirectoryWatcherThread::getPipe(void) { return pipe; } void DirectoryWatcherThread::On_FileNameChanged(const std::string & strOldFileName, const std::string & strNewFileName, bool closed) { On_FileModified(strOldFileName, closed); On_FileModified(strNewFileName, closed); } void DirectoryWatcherThread::On_DirNameChanged( const std::string & strOldFileName, const std::string & strNewFileName, bool closed ) { On_DirRemoved(strOldFileName, closed); On_FileModified(strNewFileName, closed); } void DirectoryWatcherThread::On_FileRemoved(const std::string & strFileName, bool closed) { On_FileModified(strFileName, closed); } void DirectoryWatcherThread::On_FileAdded(const std::string & strFileName, bool closed) { On_FileModified(strFileName, closed); } void DirectoryWatcherThread::On_DirAdded( const std::string & strFileName, bool closed ) { On_FileModified(strFileName, closed); } void DirectoryWatcherThread::On_FileModified(const std::string & strFileName, bool closed) { bool ok=false; std::string dir=strlower(ExtractFilePath(strFileName, os_file_sep()))+os_file_sep(); for(size_t i=0;i(ft.dwHighDateTime) << 32 | ft.dwLowDateTime; } void DirectoryWatcherThread::Commit(const std::vector& sequences) { } int64 DirectoryWatcherThread::getStartUsn( int64 sequence_id ) { return -1; } void DirectoryWatcherThread::On_FileOpen( const std::string & strFileName ) { open_files.push_back(strlower(strFileName)); }