Revert partial-update mode for TerraSync repos.

Going to implement this a different, simpler way now,
This commit is contained in:
James Turner 2016-11-19 13:50:53 +00:00
parent bd88bf1126
commit 2aaad212e8
4 changed files with 27 additions and 405 deletions

View File

@ -57,6 +57,8 @@ namespace simgear
{
}
virtual void cancel();
size_t contentSize() const
{
return _contentSize;
@ -102,7 +104,6 @@ public:
hashCacheDirty(false),
p(parent),
isUpdating(false),
updateEverything(false),
status(HTTPRepository::REPO_NO_ERROR),
totalDownloaded(0)
{ ; }
@ -114,14 +115,10 @@ public:
std::string baseUrl;
SGPath basePath;
bool isUpdating;
bool updateEverything;
string_list updatePaths;
HTTPRepository::ResultCode status;
HTTPDirectory* rootDir;
size_t totalDownloaded;
void updateWaiting();
HTTP::Request_ptr updateFile(HTTPDirectory* dir, const std::string& name,
size_t sz);
HTTP::Request_ptr updateDir(HTTPDirectory* dir, const std::string& hash,
@ -196,12 +193,10 @@ class HTTPDirectory
typedef std::vector<ChildInfo> ChildInfoList;
ChildInfoList children;
public:
HTTPDirectory(HTTPRepoPrivate* repo, const std::string& path) :
_repository(repo),
_relativePath(path),
_state(DoNotUpdate)
_relativePath(path)
{
assert(repo);
@ -238,8 +233,6 @@ public:
fpath.append(".dirindex");
_repository->updatedFileContents(fpath, hash);
_state = Updated;
children.clear();
parseDirIndex(children);
std::sort(children.begin(), children.end());
@ -247,7 +240,6 @@ public:
void failedToUpdate(HTTPRepository::ResultCode status)
{
_state = UpdateFailed;
if (_relativePath.empty()) {
// root dir failed
_repository->failedToGetRootIndex(status);
@ -261,7 +253,7 @@ public:
if (_repository->installedCopyPath.isNull()) {
return;
}
string_list indexNames = indexChildren();
const_string_list_iterator nameIt = indexNames.begin();
for (; nameIt != indexNames.end(); ++nameIt) {
@ -284,7 +276,7 @@ public:
}
SG_LOG(SG_TERRASYNC, SG_BULK, "new child, copying existing file" << cp << p);
SGBinaryFile src(cp);
SGBinaryFile dst(p);
src.open(SG_IO_IN);
@ -306,11 +298,7 @@ public:
void updateChildrenBasedOnHash()
{
// if we got here for a dir which is still updating or excluded
// from updates, just bail out right now.
if (_state != Updated) {
return;
}
//SG_LOG(SG_TERRASYNC, SG_DEBUG, "updated children for:" << relativePath());
copyInstalledChildren();
@ -345,9 +333,6 @@ public:
SG_LOG(SG_TERRASYNC, SG_DEBUG, "file exists hash is good:" << it->file() );
if (c->type == ChildInfo::DirectoryType) {
HTTPDirectory* childDir = childDirectory(it->file());
if (childDir->_state == NotUpdated) {
childDir->_state = Updated;
}
childDir->updateChildrenBasedOnHash();
}
}
@ -365,95 +350,6 @@ public:
scheduleUpdates(toBeUpdated);
}
void markAsUpToDate()
{
_state = Updated;
}
void markAsUpdating()
{
assert(_state == NotUpdated);
_state = HTTPDirectory::UpdateInProgress;
}
void markAsEnabled()
{
// assert because this should only get invoked on newly created
// directory objects which are inside the sub-tree(s) to be updated
assert(_state == DoNotUpdate);
_state = NotUpdated;
}
void markSubtreeAsNeedingUpdate()
{
if (_state == Updated) {
_state = NotUpdated; // reset back to not-updated
}
ChildInfoList::iterator cit;
for (cit = children.begin(); cit != children.end(); ++cit) {
if (cit->type == ChildInfo::DirectoryType) {
HTTPDirectory* childDir = childDirectory(cit->name);
childDir->markSubtreeAsNeedingUpdate();
}
} // of child iteration
}
void markSubtreeAsEnabled()
{
if (_state == DoNotUpdate) {
markAsEnabled();
}
ChildInfoList::iterator cit;
for (cit = children.begin(); cit != children.end(); ++cit) {
if (cit->type == ChildInfo::DirectoryType) {
HTTPDirectory* childDir = childDirectory(cit->name);
childDir->markSubtreeAsEnabled();
}
} // of child iteration
}
void markAncestorChainAsEnabled()
{
if (_state == DoNotUpdate) {
markAsEnabled();
}
if (_relativePath.empty()) {
return;
}
std::string prPath = SGPath(_relativePath).dir();
if (prPath.empty()) {
_repository->rootDir->markAncestorChainAsEnabled();
} else {
HTTPDirectory* prDir = _repository->getOrCreateDirectory(prPath);
prDir->markAncestorChainAsEnabled();
}
}
void updateIfWaiting(const std::string& hash, size_t sz)
{
if (_state == NotUpdated) {
_repository->updateDir(this, hash, sz);
return;
}
if ((_state == DoNotUpdate) || (_state == UpdateInProgress)) {
return;
}
ChildInfoList::iterator cit;
for (cit = children.begin(); cit != children.end(); ++cit) {
if (cit->type == ChildInfo::DirectoryType) {
HTTPDirectory* childDir = childDirectory(cit->name);
childDir->updateIfWaiting(cit->hash, cit->sizeInBytes);
}
} // of child iteration
}
HTTPDirectory* childDirectory(const std::string& name)
{
std::string childPath = relativePath().empty() ? name : relativePath() + "/" + name;
@ -494,11 +390,6 @@ public:
_repository->updateFile(this, *it, cit->sizeInBytes);
} else {
HTTPDirectory* childDir = childDirectory(*it);
if (childDir->_state == DoNotUpdate) {
SG_LOG(SG_TERRASYNC, SG_WARN, "scheduleUpdate, child:" << *it << " is marked do not update so skipping");
continue;
}
_repository->updateDir(childDir, cit->hash, cit->sizeInBytes);
}
}
@ -656,16 +547,7 @@ private:
HTTPRepoPrivate* _repository;
std::string _relativePath; // in URL and file-system space
typedef enum
{
NotUpdated,
UpdateInProgress,
Updated,
UpdateFailed,
DoNotUpdate
} State;
State _state;
};
HTTPRepository::HTTPRepository(const SGPath& base, HTTP::Client *cl) :
@ -703,38 +585,14 @@ SGPath HTTPRepository::fsBase() const
void HTTPRepository::update()
{
_d->rootDir->markSubtreeAsNeedingUpdate();
_d->updateWaiting();
}
void HTTPRepository::setEntireRepositoryMode()
{
if (!_d->updateEverything) {
// this is a one-way decision
_d->updateEverything = true;
}
// probably overkill but not expensive so let's check everything
// we have in case someone did something funky and switched from partial
// to 'whole repo' updating.
_d->rootDir->markSubtreeAsEnabled();
}
void HTTPRepository::addSubpath(const std::string& relPath)
{
if (_d->updateEverything) {
SG_LOG(SG_TERRASYNC, SG_WARN, "called HTTPRepository::addSubpath but updating everything");
if (_d->isUpdating) {
return;
}
_d->updatePaths.push_back(relPath);
HTTPDirectory* dir = _d->getOrCreateDirectory(relPath);
dir->markSubtreeAsEnabled();
dir->markAncestorChainAsEnabled();
_d->updateWaiting();
_d->status = REPO_NO_ERROR;
_d->isUpdating = true;
_d->failures.clear();
_d->updateDir(_d->rootDir, std::string(), 0);
}
bool HTTPRepository::isDoingSync() const
@ -794,6 +652,12 @@ HTTPRepository::failure() const
return _d->status;
}
void HTTPRepoGetRequest::cancel()
{
_directory->repository()->http->cancelRequest(this, "Reposiotry cancelled");
_directory = 0;
}
class FileGetRequest : public HTTPRepoGetRequest
{
public:
@ -825,7 +689,6 @@ HTTPRepository::failure() const
virtual void onDone()
{
file->close();
if (responseCode() == 200) {
std::string hash = strutils::encodeHex(sha1_result(&hashContext), HASH_LENGTH);
_directory->didUpdateFile(fileName, hash, contentSize());
@ -922,8 +785,8 @@ HTTPRepository::failure() const
of.write(body.data(), body.size());
of.close();
_directory->dirIndexUpdated(hash);
} else {
_directory->markAsUpToDate();
//SG_LOG(SG_TERRASYNC, SG_INFO, "updated dir index " << _directory->absolutePath());
}
_directory->repository()->totalDownloaded += contentSize();
@ -934,7 +797,7 @@ HTTPRepository::failure() const
SGTimeStamp st;
st.stamp();
_directory->updateChildrenBasedOnHash();
SG_LOG(SG_TERRASYNC, SG_INFO, "after update of:" << _directory->absolutePath() << " child update took:" << st.elapsedMSec());
SG_LOG(SG_TERRASYNC, SG_DEBUG, "after update of:" << _directory->absolutePath() << " child update took:" << st.elapsedMSec());
} catch (sg_exception& ) {
_directory->failedToUpdate(HTTPRepository::REPO_ERROR_IO);
}
@ -999,7 +862,6 @@ HTTPRepository::failure() const
HTTP::Request_ptr HTTPRepoPrivate::updateDir(HTTPDirectory* dir, const std::string& hash, size_t sz)
{
dir->markAsUpdating();
RepoRequestPtr r(new DirGetRequest(dir, hash));
r->setContentSize(sz);
makeRequest(r);
@ -1166,25 +1028,6 @@ HTTPRepository::failure() const
HTTPDirectory* d = new HTTPDirectory(this, path);
directories.push_back(d);
if (updateEverything) {
d->markAsEnabled();
} else {
string_list::const_iterator s;
bool shouldUpdate = false;
for (s = updatePaths.begin(); s != updatePaths.end(); ++s) {
size_t minLen = std::min(path.size(), s->size());
if (s->compare(0, minLen, path, 0, minLen) == 0) {
shouldUpdate = true;
break;
}
} // of paths iteration
if (shouldUpdate) {
d->markAsEnabled();
}
}
return d;
}
@ -1280,21 +1123,4 @@ HTTPRepository::failure() const
SG_LOG(SG_TERRASYNC, SG_WARN, "failed to update entry:" << relativePath << " code:" << fileStatus);
}
void HTTPRepoPrivate::updateWaiting()
{
if (!isUpdating) {
status = HTTPRepository::REPO_NO_ERROR;
isUpdating = true;
failures.clear();
}
// find to-be-updated sub-trees and kick them off
rootDir->updateIfWaiting(std::string(), 0);
// maybe there was nothing to do
if (activeRequests.empty()) {
isUpdating = false;
}
}
} // of namespace simgear

View File

@ -57,13 +57,6 @@ public:
virtual void update();
/**
* set if we should sync the entire repository
*/
void setEntireRepositoryMode();
void addSubpath(const std::string& relPath);
virtual bool isDoingSync() const;
virtual ResultCode failure() const;

View File

@ -408,7 +408,6 @@ void testBasicClone(HTTP::Client* cl)
repo.reset(new HTTPRepository(p, cl));
repo->setBaseUrl("http://localhost:2000/repo");
repo->setEntireRepositoryMode();
repo->update();
waitForUpdateComplete(cl, repo.get());
@ -446,7 +445,6 @@ void testModifyLocalFiles(HTTP::Client* cl)
repo.reset(new HTTPRepository(p, cl));
repo->setBaseUrl("http://localhost:2000/repo");
repo->setEntireRepositoryMode();
repo->update();
waitForUpdateComplete(cl, repo.get());
@ -488,7 +486,6 @@ void testMergeExistingFileWithoutDownload(HTTP::Client* cl)
repo.reset(new HTTPRepository(p, cl));
repo->setBaseUrl("http://localhost:2000/repo");
repo->setEntireRepositoryMode();
createFile(p, "dirC/fileCB", 4); // should match
createFile(p, "dirC/fileCC", 3); // mismatch
@ -531,7 +528,6 @@ void testLossOfLocalFiles(HTTP::Client* cl)
repo.reset(new HTTPRepository(p, cl));
repo->setBaseUrl("http://localhost:2000/repo");
repo->setEntireRepositoryMode();
repo->update();
waitForUpdateComplete(cl, repo.get());
verifyFileState(p, "dirB/subdirA/fileBAA");
@ -569,7 +565,6 @@ void testAbandonMissingFiles(HTTP::Client* cl)
repo.reset(new HTTPRepository(p, cl));
repo->setBaseUrl("http://localhost:2000/repo");
repo->setEntireRepositoryMode();
repo->update();
waitForUpdateComplete(cl, repo.get());
if (repo->failure() != HTTPRepository::REPO_PARTIAL_UPDATE) {
@ -594,11 +589,10 @@ void testAbandonCorruptFiles(HTTP::Client* cl)
repo.reset(new HTTPRepository(p, cl));
repo->setBaseUrl("http://localhost:2000/repo");
repo->setEntireRepositoryMode();
repo->update();
waitForUpdateComplete(cl, repo.get());
if (repo->failure() != HTTPRepository::REPO_ERROR_CHECKSUM) {
std::cerr << "Got failure state:" << repo->failure() << std::endl;
throw sg_exception("Bad result from corrupt files test");
}
@ -611,149 +605,6 @@ void testAbandonCorruptFiles(HTTP::Client* cl)
std::cout << "Passed test: detect corrupted download" << std::endl;
}
void testPartialUpdateBasic(HTTP::Client* cl)
{
std::auto_ptr<HTTPRepository> repo;
SGPath p(simgear::Dir::current().path());
p.append("http_repo_partial_update");
simgear::Dir pd(p);
if (pd.exists()) {
pd.removeChildren();
}
global_repo->clearRequestCounts();
global_repo->clearFailFlags();
global_repo->defineFile("dirA/subdirF/fileAFA");
global_repo->defineFile("dirA/subdirF/fileAFB");
global_repo->defineFile("dirA/subdirH/fileAHA");
global_repo->defineFile("dirA/subdirH/fileAHB");
global_repo->defineFile("dirG/subdirA/subsubA/fileGAAB");
// request subdir of A
repo.reset(new HTTPRepository(p, cl));
repo->setBaseUrl("http://localhost:2000/repo");
repo->addSubpath("dirA/subdirF");
waitForUpdateComplete(cl, repo.get());
verifyFileState(p, "dirA/subdirF/fileAFA");
verifyFileState(p, "dirA/subdirF/fileAFB");
verifyFileState(p, "fileA"); // files are always synced
verifyFileState(p, "dirA/fileAB");
verifyFileNotPresent(p, "dirB/subdirB/fileBBB");
verifyFileNotPresent(p, "dirD");
verifyFileNotPresent(p, "dirA/subdirH/fileAHB");
verifyRequestCount("dirA", 1);
verifyRequestCount("dirA/fileAA", 1);
verifyRequestCount("dirA/subdirF", 1);
verifyRequestCount("dirA/subdirF/fileAFA", 1);
verifyRequestCount("dirA/subdirF/fileAFB", 1);
verifyRequestCount("dirB", 0);
verifyRequestCount("dirG", 0);
// now request dir B
repo->addSubpath("dirB");
waitForUpdateComplete(cl, repo.get());
verifyFileState(p, "dirA/subdirF/fileAFB");
verifyFileState(p, "dirB/subdirB/fileBBA");
verifyFileState(p, "dirB/subdirB/fileBBB");
verifyRequestCount("dirB", 1);
verifyRequestCount("dirB/subdirA/fileBAC", 1);
verifyRequestCount("dirA", 1);
verifyRequestCount("dirA/fileAA", 1);
verifyRequestCount("dirG", 0);
// widen subdir to parent
repo->addSubpath("dirA");
waitForUpdateComplete(cl, repo.get());
verifyFileState(p, "dirA/subdirH/fileAHA");
verifyFileState(p, "dirA/subdirH/fileAHB");
verifyRequestCount("dirA", 1);
verifyRequestCount("dirB/subdirA/fileBAC", 1);
verifyRequestCount("dirA/subdirF/fileAFA", 1);
// request an already fetched subdir - should be a no-op
repo->addSubpath("dirB/subdirB");
waitForUpdateComplete(cl, repo.get());
verifyRequestCount("dirB", 1);
verifyRequestCount("dirB/subdirB/fileBBB", 1);
// add new / modify files inside
global_repo->defineFile("dirA/subdirF/fileAFC");
global_repo->defineFile("dirA/subdirF/fileAFD");
repo->update();
waitForUpdateComplete(cl, repo.get());
if (global_repo->requestCount != 2) {
throw sg_exception("Bad root request count");
}
verifyFileState(p, "dirA/subdirF/fileAFC");
verifyFileState(p, "dirA/subdirF/fileAFD");
std::cout << "Passed test: basic partial clone and update" << std::endl;
}
void testPartialUpdateExisting(HTTP::Client* cl)
{
std::auto_ptr<HTTPRepository> repo;
SGPath p(simgear::Dir::current().path());
p.append("http_repo_partial_update_existing");
simgear::Dir pd(p);
if (pd.exists()) {
pd.removeChildren();
}
global_repo->clearRequestCounts();
global_repo->clearFailFlags();
// full update to sync everything
repo.reset(new HTTPRepository(p, cl));
repo->setBaseUrl("http://localhost:2000/repo");
repo->setEntireRepositoryMode();
repo->update();
waitForUpdateComplete(cl, repo.get());
// new repo for partial
global_repo->clearRequestCounts();
repo.reset(new HTTPRepository(p, cl));
repo->setBaseUrl("http://localhost:2000/repo");
repo->addSubpath("dirA/subdirF");
waitForUpdateComplete(cl, repo.get());
if (global_repo->requestCount != 1) {
throw sg_exception("Bad root request count");
}
verifyRequestCount("dirA", 0);
verifyRequestCount("dirA/fileAA", 0);
verifyRequestCount("dirA/subdirF", 0);
verifyRequestCount("dirA/subdirF/fileAFA", 0);
verifyRequestCount("dirA/subdirF/fileAFB", 0);
// and request more dirs
// this is a good simulation of terrasync requesting more subdirs of
// an already created and in sync tree. should not generate any more
// network trip
repo->addSubpath("dirC");
verifyFileState(p, "dirC/subdirA/subsubA/fileCAAA");
verifyRequestCount("dirC/subdirA/subsubA/fileCAAA", 0);
if (global_repo->requestCount != 1) {
throw sg_exception("Bad root request count");
}
std::cout << "Passed test: partial update of existing" << std::endl;
}
void modifyBTree()
{
std::cout << "Modifying sub-tree" << std::endl;
@ -763,50 +614,6 @@ void modifyBTree()
global_repo->findEntry("dirB/subdirB/fileBBB")->revision++;
}
void testPartialUpdateWidenWhileInProgress(HTTP::Client* cl)
{
std::auto_ptr<HTTPRepository> repo;
SGPath p(simgear::Dir::current().path());
p.append("http_repo_partial_update_widen");
simgear::Dir pd(p);
if (pd.exists()) {
pd.removeChildren();
}
global_repo->clearRequestCounts();
global_repo->clearFailFlags();
// full update to sync everything
repo.reset(new HTTPRepository(p, cl));
repo->setBaseUrl("http://localhost:2000/repo");
repo->addSubpath("dirA/subdirF");
repo->addSubpath("dirB/subdirB");
waitForUpdateComplete(cl, repo.get());
verifyRequestCount("dirA/subdirF", 1);
if (global_repo->requestCount != 1) {
throw sg_exception("Bad root request count");
}
repo->addSubpath("dirA");
repo->addSubpath("dirB");
repo->addSubpath("dirC");
waitForUpdateComplete(cl, repo.get());
// should not request the root again
verifyRequestCount("dirA/subdirF", 1);
if (global_repo->requestCount != 1) {
throw sg_exception("Bad root request count");
}
verifyFileState(p, "dirA/subdirF/fileAFA");
verifyFileState(p, "dirC/subdirA/subsubA/fileCAAA");
std::cout << "Passed test: partial update with widen" << std::endl;
}
void testServerModifyDuringSync(HTTP::Client* cl)
{
std::auto_ptr<HTTPRepository> repo;
@ -822,7 +629,6 @@ void testServerModifyDuringSync(HTTP::Client* cl)
repo.reset(new HTTPRepository(p, cl));
repo->setBaseUrl("http://localhost:2000/repo");
repo->setEntireRepositoryMode();
global_repo->findEntry("dirA/fileAA")->accessCallback.reset(make_callback(&modifyBTree));
@ -832,7 +638,7 @@ void testServerModifyDuringSync(HTTP::Client* cl)
global_repo->findEntry("dirA/fileAA")->accessCallback.reset();
if (repo->failure() != HTTPRepository::REPO_ERROR_CHECKSUM) {
throw sg_exception("Bad result from corrupt files test");
throw sg_exception("Bad result from modify during sync test");
}
std::cout << "Passed test modify server during sync" << std::endl;
@ -854,7 +660,6 @@ void testDestroyDuringSync(HTTP::Client* cl)
repo.reset(new HTTPRepository(p, cl));
repo->setBaseUrl("http://localhost:2000/repo");
repo->setEntireRepositoryMode();
repo->update();
@ -872,7 +677,7 @@ void testDestroyDuringSync(HTTP::Client* cl)
int main(int argc, char* argv[])
{
sglog().setLogLevels( SG_ALL, SG_DEBUG );
sglog().setLogLevels( SG_ALL, SG_INFO );
HTTP::Client cl;
cl.setMaxConnections(1);
@ -907,13 +712,12 @@ int main(int argc, char* argv[])
testServer.disconnectAll();
cl.clearAllConnections();
testPartialUpdateBasic(&cl);
testPartialUpdateExisting(&cl);
testPartialUpdateWidenWhileInProgress(&cl);
testServerModifyDuringSync(&cl);
testDestroyDuringSync(&cl);
testServer.disconnectAll();
cl.clearAllConnections();
std::cout << "all tests passed ok" << std::endl;
return 0;
}

View File

@ -565,7 +565,6 @@ void SGTerraSync::WorkerThread::updateSyncSlot(SyncSlot &slot)
} // of creating directory step
slot.repository.reset(new HTTPRepository(path, &_http));
slot.repository->setEntireRepositoryMode();
slot.repository->setBaseUrl(_httpServer + "/" + slot.currentItem._dir);
if (_installRoot.exists()) {