TerraSync: validate local dirs incrementally

Add a process() method to HTTPRepository, and use this to
incrementally validate subdirs after the .dirIndex is received. This 
avoids large pauses of the TerraSync thread, when all of Airports/
is validated at once.
This commit is contained in:
James Turner 2020-10-30 11:51:26 +00:00 committed by James Turner
parent 96bafef3f3
commit ec3829addb
5 changed files with 57 additions and 5 deletions

View File

@ -274,7 +274,7 @@ public:
if (c.type == HTTPRepository::DirectoryType) {
// If it's a directory,perform a recursive check.
HTTPDirectory *childDir = childDirectory(c.name);
childDir->updateChildrenBasedOnHash();
_repository->scheduleUpdateOfChildren(childDir);
}
}
} // of repository-defined (well, .dirIndex) children iteration
@ -654,6 +654,33 @@ bool HTTPRepository::isDoingSync() const
return _d->isUpdating;
}
void HTTPRepository::process()
{
int processedCount = 0;
const int maxToProcess = 16;
while (processedCount < maxToProcess) {
if (_d->pendingUpdateOfChildren.empty()) {
break;
}
auto dirToUpdate = _d->pendingUpdateOfChildren.front();
_d->pendingUpdateOfChildren.pop_front();
dirToUpdate->updateChildrenBasedOnHash();
++processedCount;
}
_d->checkForComplete();
}
void HTTPRepoPrivate::checkForComplete()
{
if (pendingUpdateOfChildren.empty() && activeRequests.empty() && queuedRequests.empty()) {
isUpdating = false;
writeHashCache();
}
}
size_t HTTPRepository::bytesToDownload() const
{
size_t result = 0;
@ -1208,10 +1235,7 @@ HTTPRepository::failure() const
writeHashCache();
}
if (activeRequests.empty() && queuedRequests.empty()) {
isUpdating = false;
writeHashCache();
}
checkForComplete();
}
void HTTPRepoPrivate::failedToGetRootIndex(HTTPRepository::ResultCode st)
@ -1272,4 +1296,14 @@ HTTPRepository::failure() const
failures.end());
}
void HTTPRepoPrivate::scheduleUpdateOfChildren(HTTPDirectory* dir)
{
auto it = std::find(pendingUpdateOfChildren.begin(), pendingUpdateOfChildren.end(), dir);
if (it != pendingUpdateOfChildren.end()) {
return; // duplicate add, skip
}
pendingUpdateOfChildren.push_back(dir);
}
} // of namespace simgear

View File

@ -62,6 +62,11 @@ public:
virtual bool isDoingSync() const;
/**
@brief call this periodically to progress non-network tasks
*/
void process();
virtual ResultCode failure() const;
virtual size_t bytesToDownload() const;

View File

@ -19,6 +19,7 @@
#pragma once
#include <deque>
#include <memory>
#include <string>
#include <unordered_map>
@ -102,6 +103,8 @@ public:
void updatedChildSuccessfully(const SGPath &relativePath);
void checkForComplete();
typedef std::vector<RepoRequestPtr> RequestVector;
RequestVector queuedRequests, activeRequests;
@ -114,9 +117,16 @@ public:
HTTPDirectory *getOrCreateDirectory(const std::string &path);
bool deleteDirectory(const std::string &relPath, const SGPath &absPath);
void scheduleUpdateOfChildren(HTTPDirectory* dir);
typedef std::vector<HTTPDirectory_ptr> DirectoryVector;
DirectoryVector directories;
// list of directories to be locally processed
// this is used to avoid deep recursion when receiving the .dirIndex;
// we don't want to recurse over a large repo in a single call
std::deque<HTTPDirectory*> pendingUpdateOfChildren;
SGPath installedCopyPath;
};

View File

@ -409,6 +409,7 @@ void waitForUpdateComplete(HTTP::Client* cl, HTTPRepository* repo)
cl->update();
testServer.poll();
repo->process();
if (!repo->isDoingSync()) {
return;
}
@ -423,6 +424,7 @@ void runForTime(HTTP::Client *cl, HTTPRepository *repo, int msec = 15) {
while (start.elapsedMSec() < msec) {
cl->update();
testServer.poll();
repo->process();
SGTimeStamp::sleepForMSec(1);
}
}

View File

@ -547,6 +547,7 @@ void SGTerraSync::WorkerThread::run()
void SGTerraSync::WorkerThread::updateSyncSlot(SyncSlot &slot)
{
if (slot.repository.get()) {
slot.repository->process();
if (slot.repository->isDoingSync()) {
#if 1
if (slot.stamp.elapsedMSec() > (int)slot.nextWarnTimeout) {