TerraSync: handle reinit better
Fix various cases where re-init could get things blocked. Remove the duplicate storage of the active paths; now we always check the primary data, and hence it can’t be out of sync. Also remove the obsolete persistent cache code. Fixes some of the issues discussed in: https://sourceforge.net/p/flightgear/codetickets/2308/ Further improvements still to come, especially to retry on a better schedule for intermittent connections.
This commit is contained in:
parent
3643f1c064
commit
ece7dab47c
@ -165,6 +165,14 @@ void Client::setMaxPipelineDepth(unsigned int depth)
|
||||
#endif
|
||||
}
|
||||
|
||||
void Client::reset()
|
||||
{
|
||||
curl_multi_cleanup(d->curlMulti);
|
||||
d.reset(new ClientPrivate);
|
||||
d->tlsCertificatePath = SGPath::fromEnv("SIMGEAR_TLS_CERT_PATH");
|
||||
d->createCurlMulti();
|
||||
}
|
||||
|
||||
void Client::update(int waitTimeout)
|
||||
{
|
||||
if (d->requests.empty()) {
|
||||
|
@ -47,6 +47,8 @@ public:
|
||||
|
||||
void update(int waitTimeout = 0);
|
||||
|
||||
void reset();
|
||||
|
||||
void makeRequest(const Request_ptr& r);
|
||||
|
||||
void cancelRequest(const Request_ptr& r, std::string reason = std::string());
|
||||
|
@ -155,20 +155,16 @@ public:
|
||||
class SyncSlot
|
||||
{
|
||||
public:
|
||||
SyncSlot() :
|
||||
isNewDirectory(false),
|
||||
busy(false),
|
||||
pendingKBytes(0)
|
||||
{}
|
||||
SyncSlot() = default;
|
||||
|
||||
SyncItem currentItem;
|
||||
bool isNewDirectory;
|
||||
std::queue<SyncItem> queue;
|
||||
bool isNewDirectory = false;
|
||||
std::deque<SyncItem> queue;
|
||||
std::unique_ptr<HTTPRepository> repository;
|
||||
SGTimeStamp stamp;
|
||||
bool busy; ///< is the slot working or idle
|
||||
unsigned int pendingKBytes;
|
||||
unsigned int nextWarnTimeout;
|
||||
bool busy = false; ///< is the slot working or idle
|
||||
unsigned int pendingKBytes = 0;
|
||||
unsigned int nextWarnTimeout = 0;
|
||||
};
|
||||
|
||||
static const int SYNC_SLOT_TILES = 0; ///< Terrain and Objects sync
|
||||
@ -312,7 +308,6 @@ public:
|
||||
_state._allowed_errors = errors;
|
||||
}
|
||||
|
||||
void setCachePath(const SGPath& p) {_persistentCachePath = p;}
|
||||
void setCacheHits(unsigned int hits)
|
||||
{
|
||||
std::lock_guard<std::mutex> g(_stateLock);
|
||||
@ -328,6 +323,9 @@ public:
|
||||
}
|
||||
return st;
|
||||
}
|
||||
|
||||
bool isDirActive(const std::string& path) const;
|
||||
|
||||
private:
|
||||
void incrementCacheHits()
|
||||
{
|
||||
@ -341,11 +339,11 @@ private:
|
||||
void runInternal();
|
||||
void updateSyncSlot(SyncSlot& slot);
|
||||
|
||||
void drainWaitingTiles();
|
||||
|
||||
// commond helpers between both internal and external models
|
||||
|
||||
SyncItem::Status isPathCached(const SyncItem& next) const;
|
||||
void initCompletedTilesPersistentCache();
|
||||
void writeCompletedTilesPersistentCache() const;
|
||||
void updated(SyncItem item, bool isNewDirectory);
|
||||
void fail(SyncItem failedItem);
|
||||
void notFound(SyncItem notFoundItem);
|
||||
@ -370,7 +368,7 @@ private:
|
||||
string _dnsdn;
|
||||
|
||||
TerrasyncThreadState _state;
|
||||
std::mutex _stateLock;
|
||||
mutable std::mutex _stateLock;
|
||||
};
|
||||
|
||||
SGTerraSync::WorkerThread::WorkerThread() :
|
||||
@ -398,6 +396,18 @@ void SGTerraSync::WorkerThread::stop()
|
||||
SyncItem w(string(), SyncItem::Stop);
|
||||
request(w);
|
||||
join();
|
||||
|
||||
// clear the sync slots, in case we restart
|
||||
for (unsigned int slot = 0; slot < NUM_SYNC_SLOTS; ++slot) {
|
||||
_syncSlots[slot] = {};
|
||||
}
|
||||
|
||||
// clear these so if re-init-ing, we check again
|
||||
_completedTiles.clear();
|
||||
_notFoundItems.clear();
|
||||
|
||||
_http.reset();
|
||||
_http.setUserAgent("terrascenery-" SG_STRINGIZE(SIMGEAR_VERSION));
|
||||
}
|
||||
|
||||
bool SGTerraSync::WorkerThread::start()
|
||||
@ -417,12 +427,18 @@ bool SGTerraSync::WorkerThread::start()
|
||||
SGPath path(_local_dir);
|
||||
if (!path.exists())
|
||||
{
|
||||
SG_LOG(SG_TERRASYNC,SG_ALERT,
|
||||
"Cannot start scenery download. Directory '" << _local_dir <<
|
||||
"' does not exist. Set correct directory path or create directory folder.");
|
||||
_state._fail_count++;
|
||||
_state._stalled = true;
|
||||
return false;
|
||||
const SGPath parentDir = path.dirPath();
|
||||
if (parentDir.exists()) {
|
||||
// attempt to create terraSync dir ourselves
|
||||
bool ok = path.create_dir(0755);
|
||||
if (!ok) {
|
||||
SG_LOG(SG_TERRASYNC, SG_ALERT,
|
||||
"Cannot start scenery download. Directory '" << _local_dir << "' does not exist. Set correct directory path or create directory folder.");
|
||||
_state._fail_count++;
|
||||
_state._stalled = true;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
path.append("version");
|
||||
@ -523,8 +539,6 @@ void SGTerraSync::WorkerThread::run()
|
||||
_running = true;
|
||||
}
|
||||
|
||||
initCompletedTilesPersistentCache();
|
||||
|
||||
runInternal();
|
||||
|
||||
{
|
||||
@ -541,7 +555,7 @@ void SGTerraSync::WorkerThread::updateSyncSlot(SyncSlot &slot)
|
||||
if (slot.stamp.elapsedMSec() > (int)slot.nextWarnTimeout) {
|
||||
SG_LOG(SG_TERRASYNC, SG_INFO, "sync taking a long time:" << slot.currentItem._dir << " taken " << slot.stamp.elapsedMSec());
|
||||
SG_LOG(SG_TERRASYNC, SG_INFO, "HTTP request count:" << _http.hasActiveRequests());
|
||||
slot.nextWarnTimeout += 10000;
|
||||
slot.nextWarnTimeout += 30 * 1000;
|
||||
}
|
||||
#endif
|
||||
// convert bytes to kbytes here
|
||||
@ -565,12 +579,13 @@ void SGTerraSync::WorkerThread::updateSyncSlot(SyncSlot &slot)
|
||||
slot.busy = false;
|
||||
slot.repository.reset();
|
||||
slot.pendingKBytes = 0;
|
||||
slot.currentItem = {};
|
||||
}
|
||||
|
||||
// init and start sync of the next repository
|
||||
if (!slot.queue.empty()) {
|
||||
slot.currentItem = slot.queue.front();
|
||||
slot.queue.pop();
|
||||
slot.queue.pop_front();
|
||||
|
||||
SGPath path(_local_dir);
|
||||
path.append(slot.currentItem._dir);
|
||||
@ -605,7 +620,7 @@ void SGTerraSync::WorkerThread::updateSyncSlot(SyncSlot &slot)
|
||||
return;
|
||||
}
|
||||
|
||||
slot.nextWarnTimeout = 20000;
|
||||
slot.nextWarnTimeout = 30 * 1000;
|
||||
slot.stamp.stamp();
|
||||
slot.busy = true;
|
||||
slot.pendingKBytes = slot.repository->bytesToDownload();
|
||||
@ -648,21 +663,7 @@ void SGTerraSync::WorkerThread::runInternal()
|
||||
if (_stop)
|
||||
break;
|
||||
|
||||
// drain the waiting tiles queue into the sync slot queues.
|
||||
while (!waitingTiles.empty()) {
|
||||
SyncItem next = waitingTiles.pop_front();
|
||||
SyncItem::Status cacheStatus = isPathCached(next);
|
||||
if (cacheStatus != SyncItem::Invalid) {
|
||||
incrementCacheHits();
|
||||
SG_LOG(SG_TERRASYNC, SG_DEBUG, "\nTerraSync Cache hit for: '" << next._dir << "'");
|
||||
next._status = cacheStatus;
|
||||
_freshTiles.push_back(next);
|
||||
continue;
|
||||
}
|
||||
|
||||
unsigned int slot = syncSlotForType(next._type);
|
||||
_syncSlots[slot].queue.push(next);
|
||||
}
|
||||
drainWaitingTiles();
|
||||
|
||||
bool anySlotBusy = false;
|
||||
unsigned int newPendingCount = 0;
|
||||
@ -691,7 +692,7 @@ void SGTerraSync::WorkerThread::runInternal()
|
||||
|
||||
SyncItem::Status SGTerraSync::WorkerThread::isPathCached(const SyncItem& next) const
|
||||
{
|
||||
TileAgeCache::const_iterator ii = _completedTiles.find( next._dir );
|
||||
auto ii = _completedTiles.find(next._dir);
|
||||
if (ii == _completedTiles.end()) {
|
||||
ii = _notFoundItems.find( next._dir );
|
||||
// Invalid means 'not cached', otherwise we want to return to
|
||||
@ -735,7 +736,6 @@ void SGTerraSync::WorkerThread::notFound(SyncItem item)
|
||||
item._status = SyncItem::NotFound;
|
||||
_freshTiles.push_back(item);
|
||||
_notFoundItems[ item._dir ] = now + UpdateInterval::SuccessfulAttempt;
|
||||
writeCompletedTilesPersistentCache();
|
||||
}
|
||||
|
||||
void SGTerraSync::WorkerThread::updated(SyncItem item, bool isNewDirectory)
|
||||
@ -756,72 +756,57 @@ void SGTerraSync::WorkerThread::updated(SyncItem item, bool isNewDirectory)
|
||||
_freshTiles.push_back(item);
|
||||
_completedTiles[ item._dir ] = now + UpdateInterval::SuccessfulAttempt;
|
||||
}
|
||||
|
||||
writeCompletedTilesPersistentCache();
|
||||
}
|
||||
|
||||
void SGTerraSync::WorkerThread::initCompletedTilesPersistentCache()
|
||||
void SGTerraSync::WorkerThread::drainWaitingTiles()
|
||||
{
|
||||
if (!_persistentCachePath.exists()) {
|
||||
return;
|
||||
}
|
||||
|
||||
SGPropertyNode_ptr cacheRoot(new SGPropertyNode);
|
||||
time_t now = time(0);
|
||||
|
||||
try {
|
||||
readProperties(_persistentCachePath, cacheRoot);
|
||||
} catch (sg_exception& e) {
|
||||
SG_LOG(SG_TERRASYNC, SG_INFO, "corrupted persistent cache, discarding " << e.getFormattedMessage());
|
||||
return;
|
||||
}
|
||||
|
||||
for (int i=0; i<cacheRoot->nChildren(); ++i) {
|
||||
SGPropertyNode* entry = cacheRoot->getChild(i);
|
||||
bool isNotFound = (strcmp(entry->getName(), "not-found") == 0);
|
||||
string tileName = entry->getStringValue("path");
|
||||
time_t stamp = entry->getIntValue("stamp");
|
||||
if (stamp < now) {
|
||||
// drain the waiting tiles queue into the sync slot queues.
|
||||
while (!waitingTiles.empty()) {
|
||||
SyncItem next = waitingTiles.pop_front();
|
||||
SyncItem::Status cacheStatus = isPathCached(next);
|
||||
if (cacheStatus != SyncItem::Invalid) {
|
||||
incrementCacheHits();
|
||||
SG_LOG(SG_TERRASYNC, SG_BULK, "\nTerraSync Cache hit for: '" << next._dir << "'");
|
||||
next._status = cacheStatus;
|
||||
_freshTiles.push_back(next);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (isNotFound) {
|
||||
_completedTiles[tileName] = stamp;
|
||||
} else {
|
||||
_notFoundItems[tileName] = stamp;
|
||||
}
|
||||
const auto slot = syncSlotForType(next._type);
|
||||
_syncSlots[slot].queue.push_back(next);
|
||||
}
|
||||
}
|
||||
|
||||
void SGTerraSync::WorkerThread::writeCompletedTilesPersistentCache() const
|
||||
bool SGTerraSync::WorkerThread::isDirActive(const std::string& path) const
|
||||
{
|
||||
// cache is disabled
|
||||
if (_persistentCachePath.isNull()) {
|
||||
return;
|
||||
// check waiting tiles first. we have to copy it to check safely,
|
||||
// but since it's normally empty, this is not a big deal.
|
||||
const auto copyOfWaiting = waitingTiles.copy();
|
||||
auto it = std::find_if(copyOfWaiting.begin(), copyOfWaiting.end(), [&path](const SyncItem& i) {
|
||||
return i._dir == path;
|
||||
});
|
||||
|
||||
if (it != copyOfWaiting.end()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
sg_ofstream f(_persistentCachePath, std::ios::trunc);
|
||||
if (!f.is_open()) {
|
||||
return;
|
||||
}
|
||||
// check each sync slot in turn
|
||||
std::lock_guard<std::mutex> g(_stateLock);
|
||||
for (unsigned int slot = 0; slot < NUM_SYNC_SLOTS; ++slot) {
|
||||
const auto& syncSlot = _syncSlots[slot];
|
||||
if (syncSlot.currentItem._dir == path)
|
||||
return true;
|
||||
|
||||
SGPropertyNode_ptr cacheRoot(new SGPropertyNode);
|
||||
TileAgeCache::const_iterator it = _completedTiles.begin();
|
||||
for (; it != _completedTiles.end(); ++it) {
|
||||
SGPropertyNode* entry = cacheRoot->addChild("entry");
|
||||
entry->setStringValue("path", it->first);
|
||||
entry->setIntValue("stamp", it->second);
|
||||
}
|
||||
auto it = std::find_if(syncSlot.queue.begin(), syncSlot.queue.end(), [&path](const SyncItem& i) {
|
||||
return i._dir == path;
|
||||
});
|
||||
|
||||
it = _notFoundItems.begin();
|
||||
for (; it != _notFoundItems.end(); ++it) {
|
||||
SGPropertyNode* entry = cacheRoot->addChild("not-found");
|
||||
entry->setStringValue("path", it->first);
|
||||
entry->setIntValue("stamp", it->second);
|
||||
}
|
||||
if (it != syncSlot.queue.end()) {
|
||||
return true;
|
||||
}
|
||||
} // of sync slots iteration
|
||||
|
||||
writeProperties(f, cacheRoot, true /* write_all */);
|
||||
f.close();
|
||||
return false;
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
@ -996,12 +981,9 @@ void SGTerraSync::update(double)
|
||||
|
||||
while (_workerThread->hasNewTiles())
|
||||
{
|
||||
SyncItem next = _workerThread->getNewTile();
|
||||
|
||||
if ((next._type == SyncItem::Tile) || (next._type == SyncItem::AIData)) {
|
||||
_activeTileDirs.erase(next._dir);
|
||||
}
|
||||
} // of freshly synced items
|
||||
// ensure they are popped
|
||||
_workerThread->getNewTile();
|
||||
}
|
||||
}
|
||||
|
||||
bool SGTerraSync::isIdle() {return _workerThread->isIdle();}
|
||||
@ -1046,17 +1028,16 @@ string_list SGTerraSync::getSceneryPathSuffixes() const
|
||||
|
||||
void SGTerraSync::syncAreaByPath(const std::string& aPath)
|
||||
{
|
||||
string_list scenerySuffixes = getSceneryPathSuffixes();
|
||||
string_list::const_iterator it = scenerySuffixes.begin();
|
||||
if (!_workerThread->isRunning()) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (; it != scenerySuffixes.end(); ++it)
|
||||
{
|
||||
std::string dir = *it + "/" + aPath;
|
||||
if (_activeTileDirs.find(dir) != _activeTileDirs.end()) {
|
||||
for (const auto& suffix : getSceneryPathSuffixes()) {
|
||||
const auto dir = suffix + "/" + aPath;
|
||||
if (_workerThread->isDirActive(dir)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
_activeTileDirs.insert(dir);
|
||||
SyncItem w(dir, SyncItem::Tile);
|
||||
_workerThread->request( w );
|
||||
}
|
||||
@ -1075,13 +1056,9 @@ bool SGTerraSync::isTileDirPending(const std::string& sceneryDir) const
|
||||
return false;
|
||||
}
|
||||
|
||||
string_list scenerySuffixes = getSceneryPathSuffixes();
|
||||
string_list::const_iterator it = scenerySuffixes.begin();
|
||||
|
||||
for (; it != scenerySuffixes.end(); ++it)
|
||||
{
|
||||
string s = *it + "/" + sceneryDir;
|
||||
if (_activeTileDirs.find(s) != _activeTileDirs.end()) {
|
||||
for (const auto& suffix : getSceneryPathSuffixes()) {
|
||||
const auto s = suffix + "/" + sceneryDir;
|
||||
if (_workerThread->isDirActive(s)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
@ -1091,14 +1068,16 @@ bool SGTerraSync::isTileDirPending(const std::string& sceneryDir) const
|
||||
|
||||
void SGTerraSync::scheduleDataDir(const std::string& dataDir)
|
||||
{
|
||||
if (_activeTileDirs.find(dataDir) != _activeTileDirs.end()) {
|
||||
if (!_workerThread->isRunning()) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (_workerThread->isDirActive(dataDir)) {
|
||||
return;
|
||||
}
|
||||
|
||||
_activeTileDirs.insert(dataDir);
|
||||
SyncItem w(dataDir, SyncItem::AIData);
|
||||
_workerThread->request( w );
|
||||
|
||||
}
|
||||
|
||||
bool SGTerraSync::isDataDirPending(const std::string& dataDir) const
|
||||
@ -1107,7 +1086,7 @@ bool SGTerraSync::isDataDirPending(const std::string& dataDir) const
|
||||
return false;
|
||||
}
|
||||
|
||||
return (_activeTileDirs.find(dataDir) != _activeTileDirs.end());
|
||||
return _workerThread->isDirActive(dataDir);
|
||||
}
|
||||
|
||||
void SGTerraSync::reposition()
|
||||
|
@ -116,9 +116,6 @@ private:
|
||||
|
||||
simgear::TiedPropertyList _tiedProperties;
|
||||
BufferedLogCallback* _log;
|
||||
|
||||
typedef std::set<std::string> string_set;
|
||||
string_set _activeTileDirs;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -272,30 +272,35 @@ template<class T>
|
||||
class SGBlockingDeque
|
||||
{
|
||||
public:
|
||||
using value_type = T;
|
||||
using container_type = std::deque<T>;
|
||||
|
||||
/**
|
||||
* Create a new SGBlockingDequeue.
|
||||
*/
|
||||
SGBlockingDeque() {}
|
||||
SGBlockingDeque() = default;
|
||||
|
||||
/**
|
||||
* Destroy this dequeue.
|
||||
*/
|
||||
virtual ~SGBlockingDeque() {}
|
||||
~SGBlockingDeque() = default;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
virtual void clear() {
|
||||
std::lock_guard<std::mutex> g(mutex);
|
||||
this->queue.clear();
|
||||
void clear()
|
||||
{
|
||||
std::lock_guard<std::mutex> g(mutex);
|
||||
this->queue.clear();
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
virtual bool empty() {
|
||||
std::lock_guard<std::mutex> g(mutex);
|
||||
return this->queue.empty();
|
||||
bool empty() const
|
||||
{
|
||||
std::lock_guard<std::mutex> g(mutex);
|
||||
return this->queue.empty();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -303,10 +308,11 @@ public:
|
||||
*
|
||||
* @param item The object to add.
|
||||
*/
|
||||
virtual void push_front( const T& item ) {
|
||||
std::lock_guard<std::mutex> g(mutex);
|
||||
this->queue.push_front( item );
|
||||
not_empty.signal();
|
||||
void push_front(const T& item)
|
||||
{
|
||||
std::lock_guard<std::mutex> g(mutex);
|
||||
this->queue.push_front(item);
|
||||
not_empty.signal();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -314,10 +320,11 @@ public:
|
||||
*
|
||||
* @param item The object to add.
|
||||
*/
|
||||
virtual void push_back( const T& item ) {
|
||||
std::lock_guard<std::mutex> g(mutex);
|
||||
this->queue.push_back( item );
|
||||
not_empty.signal();
|
||||
void push_back(const T& item)
|
||||
{
|
||||
std::lock_guard<std::mutex> g(mutex);
|
||||
this->queue.push_back(item);
|
||||
not_empty.signal();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -326,14 +333,15 @@ public:
|
||||
*
|
||||
* @return The next available object.
|
||||
*/
|
||||
virtual T front() {
|
||||
std::lock_guard<std::mutex> g(mutex);
|
||||
T front() const
|
||||
{
|
||||
std::lock_guard<std::mutex> g(mutex);
|
||||
|
||||
assert(this->queue.empty() != true);
|
||||
//if (queue.empty()) throw ??
|
||||
assert(this->queue.empty() != true);
|
||||
//if (queue.empty()) throw ??
|
||||
|
||||
T item = this->queue.front();
|
||||
return item;
|
||||
T item = this->queue.front();
|
||||
return item;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -342,18 +350,19 @@ public:
|
||||
*
|
||||
* @return The next available object.
|
||||
*/
|
||||
virtual T pop_front() {
|
||||
std::lock_guard<std::mutex> g(mutex);
|
||||
T pop_front()
|
||||
{
|
||||
std::lock_guard<std::mutex> g(mutex);
|
||||
|
||||
while (this->queue.empty())
|
||||
not_empty.wait(mutex);
|
||||
while (this->queue.empty())
|
||||
not_empty.wait(mutex);
|
||||
|
||||
assert(this->queue.empty() != true);
|
||||
//if (queue.empty()) throw ??
|
||||
assert(this->queue.empty() != true);
|
||||
//if (queue.empty()) throw ??
|
||||
|
||||
T item = this->queue.front();
|
||||
this->queue.pop_front();
|
||||
return item;
|
||||
T item = this->queue.front();
|
||||
this->queue.pop_front();
|
||||
return item;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -362,18 +371,19 @@ public:
|
||||
*
|
||||
* @return The next available object.
|
||||
*/
|
||||
virtual T pop_back() {
|
||||
std::lock_guard<std::mutex> g(mutex);
|
||||
T pop_back()
|
||||
{
|
||||
std::lock_guard<std::mutex> g(mutex);
|
||||
|
||||
while (this->queue.empty())
|
||||
not_empty.wait(mutex);
|
||||
while (this->queue.empty())
|
||||
not_empty.wait(mutex);
|
||||
|
||||
assert(this->queue.empty() != true);
|
||||
//if (queue.empty()) throw ??
|
||||
assert(this->queue.empty() != true);
|
||||
//if (queue.empty()) throw ??
|
||||
|
||||
T item = this->queue.back();
|
||||
this->queue.pop_back();
|
||||
return item;
|
||||
T item = this->queue.back();
|
||||
this->queue.pop_back();
|
||||
return item;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -381,8 +391,9 @@ public:
|
||||
*
|
||||
* @return Size of queue.
|
||||
*/
|
||||
virtual size_t size() {
|
||||
std::lock_guard<std::mutex> g(mutex);
|
||||
size_t size() const
|
||||
{
|
||||
std::lock_guard<std::mutex> g(mutex);
|
||||
return this->queue.size();
|
||||
}
|
||||
|
||||
@ -391,12 +402,19 @@ public:
|
||||
while (this->queue.empty())
|
||||
not_empty.wait(mutex);
|
||||
}
|
||||
|
||||
container_type copy() const
|
||||
{
|
||||
std::lock_guard<std::mutex> g(mutex);
|
||||
return queue;
|
||||
}
|
||||
|
||||
private:
|
||||
|
||||
/**
|
||||
* Mutex to serialise access.
|
||||
*/
|
||||
std::mutex mutex;
|
||||
mutable std::mutex mutex;
|
||||
|
||||
/**
|
||||
* Condition to signal when queue not empty.
|
||||
@ -409,7 +427,7 @@ private:
|
||||
SGBlockingDeque& operator=( const SGBlockingDeque& );
|
||||
|
||||
protected:
|
||||
std::deque<T> queue;
|
||||
container_type queue;
|
||||
};
|
||||
|
||||
#endif // SGQUEUE_HXX_INCLUDED
|
||||
|
Loading…
Reference in New Issue
Block a user