TerraSync: handle reinit better

Fix various cases where re-init could get things blocked. Remove the
duplicate storage of the active paths; now we always check the primary
data, and hence it can’t be out of sync.

Also remove the obsolete persistent cache code.

Fixes some of the issues discussed in:
https://sourceforge.net/p/flightgear/codetickets/2308/

Further improvements still to come, especially to retry on a better
schedule for intermittent connections.
This commit is contained in:
James Turner 2020-08-03 18:01:19 +01:00 committed by Automatic Release Builder
parent 24b58cbe21
commit 0721db3acd
5 changed files with 168 additions and 164 deletions

View File

@ -165,6 +165,14 @@ void Client::setMaxPipelineDepth(unsigned int depth)
#endif
}
void Client::reset()
{
curl_multi_cleanup(d->curlMulti);
d.reset(new ClientPrivate);
d->tlsCertificatePath = SGPath::fromEnv("SIMGEAR_TLS_CERT_PATH");
d->createCurlMulti();
}
void Client::update(int waitTimeout)
{
if (d->requests.empty()) {

View File

@ -47,6 +47,8 @@ public:
void update(int waitTimeout = 0);
void reset();
void makeRequest(const Request_ptr& r);
void cancelRequest(const Request_ptr& r, std::string reason = std::string());

View File

@ -155,20 +155,16 @@ public:
class SyncSlot
{
public:
SyncSlot() :
isNewDirectory(false),
busy(false),
pendingKBytes(0)
{}
SyncSlot() = default;
SyncItem currentItem;
bool isNewDirectory;
std::queue<SyncItem> queue;
bool isNewDirectory = false;
std::deque<SyncItem> queue;
std::unique_ptr<HTTPRepository> repository;
SGTimeStamp stamp;
bool busy; ///< is the slot working or idle
unsigned int pendingKBytes;
unsigned int nextWarnTimeout;
bool busy = false; ///< is the slot working or idle
unsigned int pendingKBytes = 0;
unsigned int nextWarnTimeout = 0;
};
static const int SYNC_SLOT_TILES = 0; ///< Terrain and Objects sync
@ -312,7 +308,6 @@ public:
_state._allowed_errors = errors;
}
void setCachePath(const SGPath& p) {_persistentCachePath = p;}
void setCacheHits(unsigned int hits)
{
std::lock_guard<std::mutex> g(_stateLock);
@ -328,6 +323,9 @@ public:
}
return st;
}
bool isDirActive(const std::string& path) const;
private:
void incrementCacheHits()
{
@ -341,11 +339,11 @@ private:
void runInternal();
void updateSyncSlot(SyncSlot& slot);
void drainWaitingTiles();
// commond helpers between both internal and external models
SyncItem::Status isPathCached(const SyncItem& next) const;
void initCompletedTilesPersistentCache();
void writeCompletedTilesPersistentCache() const;
void updated(SyncItem item, bool isNewDirectory);
void fail(SyncItem failedItem);
void notFound(SyncItem notFoundItem);
@ -370,7 +368,7 @@ private:
string _dnsdn;
TerrasyncThreadState _state;
std::mutex _stateLock;
mutable std::mutex _stateLock;
};
SGTerraSync::WorkerThread::WorkerThread() :
@ -398,6 +396,18 @@ void SGTerraSync::WorkerThread::stop()
SyncItem w(string(), SyncItem::Stop);
request(w);
join();
// clear the sync slots, in case we restart
for (unsigned int slot = 0; slot < NUM_SYNC_SLOTS; ++slot) {
_syncSlots[slot] = {};
}
// clear these so if re-init-ing, we check again
_completedTiles.clear();
_notFoundItems.clear();
_http.reset();
_http.setUserAgent("terrascenery-" SG_STRINGIZE(SIMGEAR_VERSION));
}
bool SGTerraSync::WorkerThread::start()
@ -417,13 +427,19 @@ bool SGTerraSync::WorkerThread::start()
SGPath path(_local_dir);
if (!path.exists())
{
SG_LOG(SG_TERRASYNC,SG_ALERT,
"Cannot start scenery download. Directory '" << _local_dir <<
"' does not exist. Set correct directory path or create directory folder.");
const SGPath parentDir = path.dirPath();
if (parentDir.exists()) {
// attempt to create terraSync dir ourselves
bool ok = path.create_dir(0755);
if (!ok) {
SG_LOG(SG_TERRASYNC, SG_ALERT,
"Cannot start scenery download. Directory '" << _local_dir << "' does not exist. Set correct directory path or create directory folder.");
_state._fail_count++;
_state._stalled = true;
return false;
}
}
}
path.append("version");
if (path.exists())
@ -523,8 +539,6 @@ void SGTerraSync::WorkerThread::run()
_running = true;
}
initCompletedTilesPersistentCache();
runInternal();
{
@ -541,7 +555,7 @@ void SGTerraSync::WorkerThread::updateSyncSlot(SyncSlot &slot)
if (slot.stamp.elapsedMSec() > (int)slot.nextWarnTimeout) {
SG_LOG(SG_TERRASYNC, SG_INFO, "sync taking a long time:" << slot.currentItem._dir << " taken " << slot.stamp.elapsedMSec());
SG_LOG(SG_TERRASYNC, SG_INFO, "HTTP request count:" << _http.hasActiveRequests());
slot.nextWarnTimeout += 10000;
slot.nextWarnTimeout += 30 * 1000;
}
#endif
// convert bytes to kbytes here
@ -565,12 +579,13 @@ void SGTerraSync::WorkerThread::updateSyncSlot(SyncSlot &slot)
slot.busy = false;
slot.repository.reset();
slot.pendingKBytes = 0;
slot.currentItem = {};
}
// init and start sync of the next repository
if (!slot.queue.empty()) {
slot.currentItem = slot.queue.front();
slot.queue.pop();
slot.queue.pop_front();
SGPath path(_local_dir);
path.append(slot.currentItem._dir);
@ -605,7 +620,7 @@ void SGTerraSync::WorkerThread::updateSyncSlot(SyncSlot &slot)
return;
}
slot.nextWarnTimeout = 20000;
slot.nextWarnTimeout = 30 * 1000;
slot.stamp.stamp();
slot.busy = true;
slot.pendingKBytes = slot.repository->bytesToDownload();
@ -648,21 +663,7 @@ void SGTerraSync::WorkerThread::runInternal()
if (_stop)
break;
// drain the waiting tiles queue into the sync slot queues.
while (!waitingTiles.empty()) {
SyncItem next = waitingTiles.pop_front();
SyncItem::Status cacheStatus = isPathCached(next);
if (cacheStatus != SyncItem::Invalid) {
incrementCacheHits();
SG_LOG(SG_TERRASYNC, SG_DEBUG, "\nTerraSync Cache hit for: '" << next._dir << "'");
next._status = cacheStatus;
_freshTiles.push_back(next);
continue;
}
unsigned int slot = syncSlotForType(next._type);
_syncSlots[slot].queue.push(next);
}
drainWaitingTiles();
bool anySlotBusy = false;
unsigned int newPendingCount = 0;
@ -691,7 +692,7 @@ void SGTerraSync::WorkerThread::runInternal()
SyncItem::Status SGTerraSync::WorkerThread::isPathCached(const SyncItem& next) const
{
TileAgeCache::const_iterator ii = _completedTiles.find( next._dir );
auto ii = _completedTiles.find(next._dir);
if (ii == _completedTiles.end()) {
ii = _notFoundItems.find( next._dir );
// Invalid means 'not cached', otherwise we want to return to
@ -735,7 +736,6 @@ void SGTerraSync::WorkerThread::notFound(SyncItem item)
item._status = SyncItem::NotFound;
_freshTiles.push_back(item);
_notFoundItems[ item._dir ] = now + UpdateInterval::SuccessfulAttempt;
writeCompletedTilesPersistentCache();
}
void SGTerraSync::WorkerThread::updated(SyncItem item, bool isNewDirectory)
@ -756,72 +756,57 @@ void SGTerraSync::WorkerThread::updated(SyncItem item, bool isNewDirectory)
_freshTiles.push_back(item);
_completedTiles[ item._dir ] = now + UpdateInterval::SuccessfulAttempt;
}
writeCompletedTilesPersistentCache();
}
void SGTerraSync::WorkerThread::initCompletedTilesPersistentCache()
void SGTerraSync::WorkerThread::drainWaitingTiles()
{
if (!_persistentCachePath.exists()) {
return;
}
SGPropertyNode_ptr cacheRoot(new SGPropertyNode);
time_t now = time(0);
try {
readProperties(_persistentCachePath, cacheRoot);
} catch (sg_exception& e) {
SG_LOG(SG_TERRASYNC, SG_INFO, "corrupted persistent cache, discarding " << e.getFormattedMessage());
return;
}
for (int i=0; i<cacheRoot->nChildren(); ++i) {
SGPropertyNode* entry = cacheRoot->getChild(i);
bool isNotFound = (strcmp(entry->getName(), "not-found") == 0);
string tileName = entry->getStringValue("path");
time_t stamp = entry->getIntValue("stamp");
if (stamp < now) {
// drain the waiting tiles queue into the sync slot queues.
while (!waitingTiles.empty()) {
SyncItem next = waitingTiles.pop_front();
SyncItem::Status cacheStatus = isPathCached(next);
if (cacheStatus != SyncItem::Invalid) {
incrementCacheHits();
SG_LOG(SG_TERRASYNC, SG_BULK, "\nTerraSync Cache hit for: '" << next._dir << "'");
next._status = cacheStatus;
_freshTiles.push_back(next);
continue;
}
if (isNotFound) {
_completedTiles[tileName] = stamp;
} else {
_notFoundItems[tileName] = stamp;
}
const auto slot = syncSlotForType(next._type);
_syncSlots[slot].queue.push_back(next);
}
}
void SGTerraSync::WorkerThread::writeCompletedTilesPersistentCache() const
bool SGTerraSync::WorkerThread::isDirActive(const std::string& path) const
{
// cache is disabled
if (_persistentCachePath.isNull()) {
return;
// check waiting tiles first. we have to copy it to check safely,
// but since it's normally empty, this is not a big deal.
const auto copyOfWaiting = waitingTiles.copy();
auto it = std::find_if(copyOfWaiting.begin(), copyOfWaiting.end(), [&path](const SyncItem& i) {
return i._dir == path;
});
if (it != copyOfWaiting.end()) {
return true;
}
sg_ofstream f(_persistentCachePath, std::ios::trunc);
if (!f.is_open()) {
return;
}
// check each sync slot in turn
std::lock_guard<std::mutex> g(_stateLock);
for (unsigned int slot = 0; slot < NUM_SYNC_SLOTS; ++slot) {
const auto& syncSlot = _syncSlots[slot];
if (syncSlot.currentItem._dir == path)
return true;
SGPropertyNode_ptr cacheRoot(new SGPropertyNode);
TileAgeCache::const_iterator it = _completedTiles.begin();
for (; it != _completedTiles.end(); ++it) {
SGPropertyNode* entry = cacheRoot->addChild("entry");
entry->setStringValue("path", it->first);
entry->setIntValue("stamp", it->second);
}
auto it = std::find_if(syncSlot.queue.begin(), syncSlot.queue.end(), [&path](const SyncItem& i) {
return i._dir == path;
});
it = _notFoundItems.begin();
for (; it != _notFoundItems.end(); ++it) {
SGPropertyNode* entry = cacheRoot->addChild("not-found");
entry->setStringValue("path", it->first);
entry->setIntValue("stamp", it->second);
if (it != syncSlot.queue.end()) {
return true;
}
} // of sync slots iteration
writeProperties(f, cacheRoot, true /* write_all */);
f.close();
return false;
}
///////////////////////////////////////////////////////////////////////////////
@ -996,12 +981,9 @@ void SGTerraSync::update(double)
while (_workerThread->hasNewTiles())
{
SyncItem next = _workerThread->getNewTile();
if ((next._type == SyncItem::Tile) || (next._type == SyncItem::AIData)) {
_activeTileDirs.erase(next._dir);
// ensure they are popped
_workerThread->getNewTile();
}
} // of freshly synced items
}
bool SGTerraSync::isIdle() {return _workerThread->isIdle();}
@ -1046,17 +1028,16 @@ string_list SGTerraSync::getSceneryPathSuffixes() const
void SGTerraSync::syncAreaByPath(const std::string& aPath)
{
string_list scenerySuffixes = getSceneryPathSuffixes();
string_list::const_iterator it = scenerySuffixes.begin();
if (!_workerThread->isRunning()) {
return;
}
for (; it != scenerySuffixes.end(); ++it)
{
std::string dir = *it + "/" + aPath;
if (_activeTileDirs.find(dir) != _activeTileDirs.end()) {
for (const auto& suffix : getSceneryPathSuffixes()) {
const auto dir = suffix + "/" + aPath;
if (_workerThread->isDirActive(dir)) {
continue;
}
_activeTileDirs.insert(dir);
SyncItem w(dir, SyncItem::Tile);
_workerThread->request( w );
}
@ -1075,13 +1056,9 @@ bool SGTerraSync::isTileDirPending(const std::string& sceneryDir) const
return false;
}
string_list scenerySuffixes = getSceneryPathSuffixes();
string_list::const_iterator it = scenerySuffixes.begin();
for (; it != scenerySuffixes.end(); ++it)
{
string s = *it + "/" + sceneryDir;
if (_activeTileDirs.find(s) != _activeTileDirs.end()) {
for (const auto& suffix : getSceneryPathSuffixes()) {
const auto s = suffix + "/" + sceneryDir;
if (_workerThread->isDirActive(s)) {
return true;
}
}
@ -1091,14 +1068,16 @@ bool SGTerraSync::isTileDirPending(const std::string& sceneryDir) const
void SGTerraSync::scheduleDataDir(const std::string& dataDir)
{
if (_activeTileDirs.find(dataDir) != _activeTileDirs.end()) {
if (!_workerThread->isRunning()) {
return;
}
if (_workerThread->isDirActive(dataDir)) {
return;
}
_activeTileDirs.insert(dataDir);
SyncItem w(dataDir, SyncItem::AIData);
_workerThread->request( w );
}
bool SGTerraSync::isDataDirPending(const std::string& dataDir) const
@ -1107,7 +1086,7 @@ bool SGTerraSync::isDataDirPending(const std::string& dataDir) const
return false;
}
return (_activeTileDirs.find(dataDir) != _activeTileDirs.end());
return _workerThread->isDirActive(dataDir);
}
void SGTerraSync::reposition()

View File

@ -116,9 +116,6 @@ private:
simgear::TiedPropertyList _tiedProperties;
BufferedLogCallback* _log;
typedef std::set<std::string> string_set;
string_set _activeTileDirs;
};
}

View File

@ -272,20 +272,24 @@ template<class T>
class SGBlockingDeque
{
public:
using value_type = T;
using container_type = std::deque<T>;
/**
* Create a new SGBlockingDequeue.
*/
SGBlockingDeque() {}
SGBlockingDeque() = default;
/**
* Destroy this dequeue.
*/
virtual ~SGBlockingDeque() {}
~SGBlockingDeque() = default;
/**
*
*/
virtual void clear() {
void clear()
{
std::lock_guard<std::mutex> g(mutex);
this->queue.clear();
}
@ -293,7 +297,8 @@ public:
/**
*
*/
virtual bool empty() {
bool empty() const
{
std::lock_guard<std::mutex> g(mutex);
return this->queue.empty();
}
@ -303,9 +308,10 @@ public:
*
* @param item The object to add.
*/
virtual void push_front( const T& item ) {
void push_front(const T& item)
{
std::lock_guard<std::mutex> g(mutex);
this->queue.push_front( item );
this->queue.push_front(item);
not_empty.signal();
}
@ -314,9 +320,10 @@ public:
*
* @param item The object to add.
*/
virtual void push_back( const T& item ) {
void push_back(const T& item)
{
std::lock_guard<std::mutex> g(mutex);
this->queue.push_back( item );
this->queue.push_back(item);
not_empty.signal();
}
@ -326,7 +333,8 @@ public:
*
* @return The next available object.
*/
virtual T front() {
T front() const
{
std::lock_guard<std::mutex> g(mutex);
assert(this->queue.empty() != true);
@ -342,7 +350,8 @@ public:
*
* @return The next available object.
*/
virtual T pop_front() {
T pop_front()
{
std::lock_guard<std::mutex> g(mutex);
while (this->queue.empty())
@ -362,7 +371,8 @@ public:
*
* @return The next available object.
*/
virtual T pop_back() {
T pop_back()
{
std::lock_guard<std::mutex> g(mutex);
while (this->queue.empty())
@ -381,7 +391,8 @@ public:
*
* @return Size of queue.
*/
virtual size_t size() {
size_t size() const
{
std::lock_guard<std::mutex> g(mutex);
return this->queue.size();
}
@ -391,12 +402,19 @@ public:
while (this->queue.empty())
not_empty.wait(mutex);
}
container_type copy() const
{
std::lock_guard<std::mutex> g(mutex);
return queue;
}
private:
/**
* Mutex to serialise access.
*/
std::mutex mutex;
mutable std::mutex mutex;
/**
* Condition to signal when queue not empty.
@ -409,7 +427,7 @@ private:
SGBlockingDeque& operator=( const SGBlockingDeque& );
protected:
std::deque<T> queue;
container_type queue;
};
#endif // SGQUEUE_HXX_INCLUDED