Allow multiple simultaneous waiters for thread pool thread

(cherry picked from commit 8432f450001a6deb3d1d92542994a17b4dc049a0)
(cherry picked from commit 3665d35df0f29fe625359a1daa475c1c4bcc4329)
This commit is contained in:
Martin 2018-03-30 14:39:39 +02:00 committed by Martin Raiber
parent 76e6140c18
commit be34a3aa7f
2 changed files with 38 additions and 11 deletions

View File

@ -119,11 +119,16 @@ IThread * CThreadPool::getRunnable(THREADPOOL_TICKET *todel, bool del, bool& sto
if( del==true )
{
--nRunning;
std::map<THREADPOOL_TICKET, ICondition *>::iterator it=running.find(*todel);
std::map<THREADPOOL_TICKET, SRunningConds>::iterator it=running.find(*todel);
if( it!=running.end() )
{
if( it->second!=NULL )
it->second->notify_all();
if( it->second.cond!=NULL )
it->second.cond->notify_all();
for (size_t i = 0; i < it->second.conds.size(); ++i)
{
it->second.conds[i]->notify_all();
}
running.erase(it);
}
@ -219,7 +224,7 @@ void CThreadPool::Shutdown(void)
bool CThreadPool::isRunningInt(THREADPOOL_TICKET ticket)
{
std::map<THREADPOOL_TICKET, ICondition*>::iterator it=running.find(ticket);
std::map<THREADPOOL_TICKET, SRunningConds>::iterator it=running.find(ticket);
if( it!=running.end() )
return true;
else
@ -245,10 +250,13 @@ bool CThreadPool::waitFor(std::vector<THREADPOOL_TICKET> tickets, int timems)
for( size_t i=0;i<tickets.size();++i)
{
std::map<THREADPOOL_TICKET, ICondition*>::iterator it=running.find(tickets[i]);
std::map<THREADPOOL_TICKET, SRunningConds>::iterator it=running.find(tickets[i]);
if( it!=running.end() )
{
it->second=cond;
if (it->second.cond == NULL)
it->second.cond = cond;
else
it->second.conds.push_back(cond);
}
}
@ -291,12 +299,21 @@ bool CThreadPool::waitFor(std::vector<THREADPOOL_TICKET> tickets, int timems)
for( size_t i=0;i<tickets.size();++i)
{
std::map<THREADPOOL_TICKET, ICondition*>::iterator it=running.find(tickets[i]);
std::map<THREADPOOL_TICKET, SRunningConds>::iterator it=running.find(tickets[i]);
if( it!=running.end() )
{
if(it->second==cond)
if(it->second.cond==cond)
{
it->second=NULL;
it->second.cond=NULL;
}
else
{
std::vector<ICondition*>::iterator it_vec =
std::find(it->second.conds.begin(), it->second.conds.end(), cond);
if (it_vec != it->second.conds.end())
{
it->second.conds.erase(it_vec);
}
}
}
}
@ -343,7 +360,7 @@ THREADPOOL_TICKET CThreadPool::execute(IThread *runnable, const std::string& nam
}
toexecute.push_back(SNewTask(runnable, currticket, name));
running.insert(std::pair<THREADPOOL_TICKET, ICondition*>(currticket, (ICondition*)NULL) );
running.insert(std::pair<THREADPOOL_TICKET, SRunningConds>(currticket, SRunningConds()) );
++nRunning;
cond->notify_one();
return currticket;

View File

@ -64,7 +64,17 @@ private:
std::deque<SNewTask> toexecute;
IMutex* mutex;
ICondition* cond;
std::map<THREADPOOL_TICKET, ICondition*> running;
struct SRunningConds
{
SRunningConds()
: cond(NULL) {}
ICondition* cond;
std::vector<ICondition*> conds;
};
std::map<THREADPOOL_TICKET, SRunningConds> running;
THREADPOOL_TICKET currticket;
volatile bool dexit;