Refactored the management of the request queues so that the appropraite mutex is locked when DatabaseRequest objects are modified

This commit is contained in:
Robert Osfield 2008-06-12 16:33:24 +00:00
parent fac838f791
commit b7a548923c

View File

@ -238,32 +238,42 @@ public:
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//
// RequestQueue
// ReadQueue
//
DatabasePager::RequestQueue::RequestQueue(DatabasePager* pager, const std::string& name):
DatabasePager::ReadQueue::ReadQueue(DatabasePager* pager, const std::string& name):
_pager(pager),
_name(name)
{
_block = new osg::RefBlock;
}
void DatabasePager::RequestQueue::clear()
void DatabasePager::ReadQueue::clear()
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_requestMutex);
for(RequestList::iterator citr = _requestList.begin();
citr != _requestList.end();
++citr)
{
(*citr)->_loadedModel = 0;
(*citr)->_requestQueue = 0;
}
_requestList.clear();
updateBlock();
}
void DatabasePager::RequestQueue::add(DatabasePager::DatabaseRequest* databaseRequest)
void DatabasePager::ReadQueue::add(DatabasePager::DatabaseRequest* databaseRequest)
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_requestMutex);
_requestList.push_back(databaseRequest);
databaseRequest->_requestQueue = this;
updateBlock();
}
void DatabasePager::RequestQueue::takeFirst(osg::ref_ptr<DatabaseRequest>& databaseRequest)
void DatabasePager::ReadQueue::takeFirst(osg::ref_ptr<DatabaseRequest>& databaseRequest)
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_requestMutex);
@ -271,6 +281,7 @@ void DatabasePager::RequestQueue::takeFirst(osg::ref_ptr<DatabaseRequest>& datab
{
_requestList.sort(SortFileRequestFunctor());
databaseRequest = _requestList.front();
databaseRequest->_requestQueue = 0;
_requestList.erase(_requestList.begin());
updateBlock();
@ -364,8 +375,8 @@ void DatabasePager::DatabaseThread::run()
bool firstTime = true;
osg::ref_ptr<DatabasePager::RequestQueue> read_queue;
osg::ref_ptr<DatabasePager::RequestQueue> out_queue;
osg::ref_ptr<DatabasePager::ReadQueue> read_queue;
osg::ref_ptr<DatabasePager::ReadQueue> out_queue;
switch(_mode)
{
@ -630,13 +641,15 @@ void DatabasePager::DatabaseThread::run()
{
if (loadedObjectsNeedToBeCompiled)
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_pager->_dataToCompileListMutex);
_pager->_dataToCompileList.push_back(databaseRequest);
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_pager->_dataToCompileList->_requestMutex);
databaseRequest->_requestQueue = _pager->_dataToCompileList.get();
_pager->_dataToCompileList->_requestList.push_back(databaseRequest);
}
else
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_pager->_dataToMergeListMutex);
_pager->_dataToMergeList.push_back(databaseRequest);
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_pager->_dataToMergeList->_requestMutex);
databaseRequest->_requestQueue = _pager->_dataToMergeList.get();
_pager->_dataToMergeList->_requestList.push_back(databaseRequest);
}
}
@ -645,28 +658,29 @@ void DatabasePager::DatabaseThread::run()
// graphics context thread(s).
if (loadedObjectsNeedToBeCompiled)
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_pager->_dataToCompileListMutex);
_pager->_dataToCompileList.sort(SortFileRequestFunctor());
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_pager->_dataToCompileList->_requestMutex);
_pager->_dataToCompileList->_requestList.sort(SortFileRequestFunctor());
// Prune all the old entries.
DatabaseRequestList::iterator tooOld
= std::find_if(_pager->_dataToCompileList.begin(),
_pager->_dataToCompileList.end(),
RequestQueue::RequestList::iterator tooOld
= std::find_if(_pager->_dataToCompileList->_requestList.begin(),
_pager->_dataToCompileList->_requestList.end(),
refPtrAdapt(std::not1(std::bind2nd(std::mem_fun(&DatabaseRequest::isRequestCurrent),
_pager->_frameNumber))));
// This is the database thread, so just delete
for(DatabaseRequestList::iterator citr = tooOld;
citr != _pager->_dataToCompileList.end();
for(RequestQueue::RequestList::iterator citr = tooOld;
citr != _pager->_dataToCompileList->_requestList.end();
++citr)
{
osg::notify(osg::INFO)<<_name<<": pruning from compile list"<<std::endl;
(*citr)->_loadedModel = 0;
(*citr)->_requestQueue = 0;
}
_pager->_dataToCompileList.erase(tooOld, _pager->_dataToCompileList.end());
_pager->_dataToCompileList->_requestList.erase(tooOld, _pager->_dataToCompileList->_requestList.end());
loadedObjectsNeedToBeCompiled = !_pager->_dataToCompileList.empty();
loadedObjectsNeedToBeCompiled = !_pager->_dataToCompileList->_requestList.empty();
}
if (loadedObjectsNeedToBeCompiled && !_pager->_activeGraphicsContexts.empty())
@ -845,8 +859,12 @@ DatabasePager::DatabasePager()
//if (osgDB::Registry::instance()->getSharedStateManager())
//osgDB::Registry::instance()->setUseObjectCacheHint(true);
_fileRequestQueue = new RequestQueue(this,"fileRequestQueue");
_httpRequestQueue = new RequestQueue(this,"httpRequestQueue");
_fileRequestQueue = new ReadQueue(this,"fileRequestQueue");
_httpRequestQueue = new ReadQueue(this,"httpRequestQueue");
_dataToCompileList = new RequestQueue;
_dataToMergeList = new RequestQueue;
#if 0
_databaseThreads.push_back(new DatabaseThread(this, DatabaseThread::HANDLE_ALL_REQUESTS,"HANDLE_ALL_REQUESTS"));
@ -893,8 +911,11 @@ DatabasePager::DatabasePager(const DatabasePager& rhs)
_minimumTimeAvailableForGLCompileAndDeletePerFrame = rhs._minimumTimeAvailableForGLCompileAndDeletePerFrame;
_maximumNumOfObjectsToCompilePerFrame = rhs._maximumNumOfObjectsToCompilePerFrame;
_fileRequestQueue = new RequestQueue(this,"fileRequestQueue");
_httpRequestQueue = new RequestQueue(this,"httpRequestQueue");
_fileRequestQueue = new ReadQueue(this,"fileRequestQueue");
_httpRequestQueue = new ReadQueue(this,"httpRequestQueue");
_dataToCompileList = new RequestQueue;
_dataToMergeList = new RequestQueue;
for(DatabaseThreadList::const_iterator dt_itr = rhs._databaseThreads.begin();
dt_itr != rhs._databaseThreads.end();
@ -985,13 +1006,27 @@ void DatabasePager::clear()
_httpRequestQueue->clear();
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_dataToCompileListMutex);
_dataToCompileList.clear();
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_dataToCompileList->_requestMutex);
for(RequestQueue::RequestList::iterator citr = _dataToCompileList->_requestList.begin();
citr != _dataToCompileList->_requestList.end();
++citr)
{
(*citr)->_loadedModel = 0;
(*citr)->_requestQueue = 0;
}
_dataToCompileList->_requestList.clear();
}
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_dataToMergeListMutex);
_dataToMergeList.clear();
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_dataToMergeList->_requestMutex);
for(RequestQueue::RequestList::iterator citr = _dataToMergeList->_requestList.begin();
citr != _dataToMergeList->_requestList.end();
++citr)
{
(*citr)->_loadedModel = 0;
(*citr)->_requestQueue = 0;
}
_dataToMergeList->_requestList.clear();
}
// note, no need to use a mutex as the list is only accessed from the update thread.
@ -1050,12 +1085,25 @@ void DatabasePager::requestNodeFile(const std::string& fileName,osg::Group* grou
DatabaseRequest* databaseRequest = dynamic_cast<DatabaseRequest*>(databaseRequestRef.get());
if (databaseRequest)
{
osg::notify(osg::INFO)<<"DatabasePager::fileRquest("<<fileName<<") updating alraedy assigned."<<std::endl;
osg::notify(osg::INFO)<<"DatabasePager::fileRequest("<<fileName<<") updating alraedy assigned."<<std::endl;
databaseRequest->_frameNumberLastRequest = frameNumber;
databaseRequest->_timestampLastRequest = timestamp;
databaseRequest->_priorityLastRequest = priority;
++(databaseRequest->_numOfRequests);
RequestQueue* requestQueue = databaseRequest->_requestQueue;
if (requestQueue)
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(requestQueue->_requestMutex);
databaseRequest->_frameNumberLastRequest = frameNumber;
databaseRequest->_timestampLastRequest = timestamp;
databaseRequest->_priorityLastRequest = priority;
++(databaseRequest->_numOfRequests);
}
else
{
databaseRequest->_frameNumberLastRequest = frameNumber;
databaseRequest->_timestampLastRequest = timestamp;
databaseRequest->_priorityLastRequest = priority;
++(databaseRequest->_numOfRequests);
}
foundEntry = true;
@ -1071,6 +1119,7 @@ void DatabasePager::requestNodeFile(const std::string& fileName,osg::Group* grou
databaseRequest->_priorityLastRequest = priority;
databaseRequest->_groupForAddingLoadedSubgraph = group;
databaseRequest->_loadOptions = loadOptions;
databaseRequest->_requestQueue = _fileRequestQueue.get();
_fileRequestQueue->add(databaseRequest);
}
@ -1099,6 +1148,7 @@ void DatabasePager::requestNodeFile(const std::string& fileName,osg::Group* grou
databaseRequest->_priorityLastRequest = priority;
databaseRequest->_groupForAddingLoadedSubgraph = group;
databaseRequest->_loadOptions = loadOptions;
databaseRequest->_requestQueue = _fileRequestQueue.get();
_fileRequestQueue->_requestList.push_back(databaseRequest.get());
@ -1157,8 +1207,8 @@ void DatabasePager::setDatabasePagerThreadPause(bool pause)
bool DatabasePager::requiresUpdateSceneGraph() const
{
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_dataToMergeListMutex);
if (!_dataToMergeList.empty()) return true;
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_dataToMergeList->_requestMutex);
if (!_dataToMergeList->_requestList.empty()) return true;
}
return false;
@ -1171,16 +1221,16 @@ void DatabasePager::addLoadedDataToSceneGraph(double timeStamp)
// osg::Timer_t before = osg::Timer::instance()->tick();
DatabaseRequestList localFileLoadedList;
RequestQueue::RequestList localFileLoadedList;
// get the dat for the _dataToCompileList, leaving it empty via a std::vector<>.swap.
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_dataToMergeListMutex);
localFileLoadedList.swap(_dataToMergeList);
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_dataToMergeList->_requestMutex);
localFileLoadedList.swap(_dataToMergeList->_requestList);
}
// add the loaded data into the scene graph.
for(DatabaseRequestList::iterator itr=localFileLoadedList.begin();
for(RequestQueue::RequestList::iterator itr=localFileLoadedList.begin();
itr!=localFileLoadedList.end();
++itr)
{
@ -1360,8 +1410,8 @@ void DatabasePager::registerPagedLODs(osg::Node* subgraph)
bool DatabasePager::requiresCompileGLObjects() const
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_dataToCompileListMutex);
return !_dataToCompileList.empty();
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_dataToCompileList->_requestMutex);
return !_dataToCompileList->_requestList.empty();
}
void DatabasePager::setCompileGLObjectsForContextID(unsigned int contextID, bool on)
@ -1433,10 +1483,10 @@ void DatabasePager::compileGLObjects(osg::State& state, double& availableTime)
// get the first compilable entry.
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_dataToCompileListMutex);
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_dataToCompileList->_requestMutex);
// advance to the next entry to compile if one is available.
databaseRequest = _dataToCompileList.empty() ? 0 : _dataToCompileList.front();
databaseRequest = _dataToCompileList->_requestList.empty() ? 0 : _dataToCompileList->_requestList.front();
};
unsigned int numObjectsCompiled = 0;
@ -1553,7 +1603,7 @@ void DatabasePager::compileGLObjects(osg::State& state, double& availableTime)
// osg::notify(osg::NOTICE)<<"All compiled"<<std::endl;
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_dataToCompileListMutex);
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_dataToCompileList->_requestMutex);
// The request might have been removed from the
// _dataToCompile list by another graphics thread, in
@ -1563,19 +1613,21 @@ void DatabasePager::compileGLObjects(osg::State& state, double& availableTime)
// shuffled by the pager, so the current request is
// not guaranteed to still be at the beginning of the
// list.
DatabaseRequestList::iterator requestIter
= std::find(_dataToCompileList.begin(), _dataToCompileList.end(),
RequestQueue::RequestList::iterator requestIter
= std::find(_dataToCompileList->_requestList.begin(), _dataToCompileList->_requestList.end(),
databaseRequest);
if (requestIter != _dataToCompileList.end())
if (requestIter != _dataToCompileList->_requestList.end())
{
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_dataToMergeListMutex);
_dataToMergeList.push_back(databaseRequest);
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_dataToMergeList->_requestMutex);
databaseRequest->_requestQueue = _dataToMergeList.get();
_dataToMergeList->_requestList.push_back(databaseRequest);
}
_dataToCompileList.erase(requestIter);
_dataToCompileList->_requestList.erase(requestIter);
}
if (!_dataToCompileList.empty()) databaseRequest = _dataToCompileList.front();
if (!_dataToCompileList->_requestList.empty()) databaseRequest = _dataToCompileList->_requestList.front();
else databaseRequest = 0;
}